Application protocol providing Reactive Streams semantics
Why RSocket?
RSocket provides a protocol for Reactive Streams semantics between client-server, and server-server communication.
MotivationsWhat is RSocket?
RSocket is a binary protocol for use on byte stream transports such as TCP, WebSockets, and Aeron.
Protocol SpecHow do I use RSocket?
RSocket is intended to be consumed via one of the various implementation libraries which implement the RSocket Protocol.
Browse LibrariesImplementations
Drivers are assumed to implement all core features defined in the Protocol document.
Basic Examples
- Java
- Kotlin
- C++
- TypeScript
- Python
Server Example
RSocketServer.create(new PingHandler())
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create(7878))
.block()
.onClose();
Client Example
Mono<RSocket> client =
RSocketConnector.create()
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create(7878));
PingClient pingClient = new PingClient(client);
Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));
int count = 1_000;
pingClient
.requestResponsePingPong(count, recorder)
.doOnTerminate(() -> System.out.println("Sent " + count + " messages."))
.blockLast();
Server Example
embeddedServer(CIO, port = 9000) { // create and configure ktor server and start it on localhost:9000
install(WebSockets)
install(RSocketSupport)
routing {
rSocket("rsocket") { // configure route 'localhost:9000/rsocket'
RSocketRequestHandler { // create simple request handler
requestStream { request: Payload -> // register request/stream handler
println("Received request: '${request.data.readText()}'")
flow {
repeat(10) { i -> emit(buildPayload { data("data: $i") }) }
}
}
}
}
}
}.start(wait = true)
Client Example
val client = HttpClient { //create and configure ktor client
install(WebSockets)
install(RSocketSupport)
}
// connect to 'localhost:9000/rsocket'
val rSocket: RSocket = client.rSocket(path = "rsocket", port = 9000)
// request stream
val stream: Flow<Payload> = rSocket.requestStream(buildPayload { data("Hello") })
// collect stream
stream.collect { payload: Payload ->
println("Received payload: '${payload.data.readText()}'")
}
Server Example
// RSocket server accepting on TCP
auto rs = RSocket::createServer(TcpConnectionAcceptor::create(FLAGS_port));
// global request handler
auto handler = std::make_shared<HelloStreamRequestHandler>();
// start accepting connections
rs->startAndPark([handler](auto r) { return handler; });
Client Example
auto rsf = RSocket::createClient(TcpConnectionFactory::create(host, port));
auto s = std::make_shared<ExampleSubscriber>(5, 6);
auto rs = rsf->connect().get();
rs->requestStream(Payload("Jane"), s);
Server Example
import { OnExtensionSubscriber, OnNextSubscriber, OnTerminalSubscriber, Payload, RSocketServer } from "rsocket-core";
import { TcpServerTransport } from "rsocket-tcp-server";
const transport = new TcpServerTransport({
listenOptions: {
host: "127.0.0.1",
port: 9090,
},
});
const server = new RSocketServer({
transport,
acceptor: {
accept: async () => {
return {
requestResponse: (
payload: Payload,
responderStream: OnTerminalSubscriber &
OnNextSubscriber &
OnExtensionSubscriber
) => {
const timeout = setTimeout(
() => {
return responderStream.onNext(
{
data: Buffer.concat([Buffer.from("Echo: "), payload.data]),
},
true
);
},
1000
);
return {
cancel: () => {
clearTimeout(timeout);
console.log("cancelled");
},
onExtension: () => { },
};
},
};
},
},
});
await server.bind();
Client Example
import { RSocketConnector } from "rsocket-core";
import { TcpClientTransport } from "rsocket-tcp-client";
const connector = new RSocketConnector({
transport: new TcpClientTransport({
connectionOptions: {
host: "127.0.0.1",
port: 9090,
},
}),
});
const rsocket = await connector.connect();
rsocket.requestResponse(
{
data: Buffer.from("Hello World"),
},
{
onError: (e) => {
console.error(e);
},
onNext: (payload, isComplete) => {
console.log(
`payload[data: ${payload.data}; metadata: ${payload.metadata}]|${isComplete}`
);
},
onComplete: () => { },
onExtension: () => { },
}
);
Server Example
import asyncio
import logging
from rsocket.helpers import create_future
from rsocket.local_typing import Awaitable
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
from rsocket.rsocket_server import RSocketServer
from rsocket.transports.tcp import TransportTCP
class Handler(BaseRequestHandler):
async def request_response(self, payload: Payload) -> Awaitable[Payload]:
logging.info(payload.data)
return create_future(Payload(b'Echo: ' + payload.data))
async def run_server():
def session(*connection):
RSocketServer(TransportTCP(*connection), handler_factory=Handler)
server = await asyncio.start_server(session, 'localhost', 7878)
async with server:
await server.serve_forever()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(run_server())
Client Example
import asyncio
import logging
from rsocket.helpers import single_transport_provider
from rsocket.payload import Payload
from rsocket.rsocket_client import RSocketClient
from rsocket.rx_support.rx_rsocket import RxRSocket
from rsocket.transports.tcp import TransportTCP
async def main():
connection = await asyncio.open_connection('localhost', 7878)
async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:
rx_client = RxRSocket(client)
payload = Payload(b'Hello World')
result = await rx_client.request_response(payload).pipe()
logging.info(result.data)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(main())