RSocket Communication Protocol for Reactive Applications

Quick Inquiry


Technology: RSocket is a binary, point-to-point communication protocol that support Reactive Streams. It provides an alternative to other protocols like HTTP.

It is bi-directional, multiplexed, message-based on Reactive back pressure.

It is mainly developed to work with Reactive style applications, which are non-blocking applications. The meaning of Reactive Pressure is Publisher will not send the message to subscriber, unless subscriber will send the message, indicating it is ready and how to many messages that it can handle, then publisher will send the messages to subscriber.

One of the drawbacks of HTTP Protocol is its one directional, as our data is modified there is no way to communicate to the client that the data is updated, at the client end we need to handle all retry logic, timeouts, circuit breakers etc. If the application is built using Reactive Architecture can easily scale, avoid the above HTTP failures.

Difference between RSocket and HTTP Protocol:

  1. Fire and Forget: When the response is not needed, such as non-critical event logging, sending email, etc.
  2. Request/Response:  When we send the Request, and receive the response like HTTP, it also adds advantages over HTTP, through multiplexing and asynchronous.
  3. Request/Stream:  Similar to Request/Response, returning the collection, the collection will stream for next results until it completes. It will be used in large data transfer between applications, like by giving the bank account number, it will respond with all bank account transactions.
  4. Channel: a bi-directional stream of messages allowing for arbitrary interaction models.

Rscocket will also supports resumption, using the previous connection id, if the stream in memory we can resume the consumption of the stream.

This is particularly useful for mobile‹–›server communication when network connections drop, switch, and reconnect frequently.

Maven Dependencies:

Create a Spring boot application from https://start.spring.io/ and select Lombok and ReactiveWeb as dependencies.

And manually add the below dependencies related to RSocket.

<dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-transport-netty</artifactId> <version>${rsocket.version}</version> </dependency>

And add the below property in properties section:


As we know Netty is the popular server for Reactive applications

  1. Request/Stream:

    Request/stream model, subscriber will send a request, and publisher will start sending the response for un-limited time.

    The requestStream method returns a Flux stream.

    Create the below classes in our application:

    @Component class Producer implements Ordered, ApplicationListener<ApplicationReadyEvent> { privatestaticfinal Logger log = LogManager.getLogger(Producer.class); @Override publicintgetOrder() { returnOrdered.HIGHEST_PRECEDENCE; } Flux<String>notifications(String name) { returnFlux.fromStream(Stream.generate(() ->"Hello " + name + "@" + Instant.now().toString())) .delayElements(Duration.ofSeconds(2)); } @Override publicvoidonApplicationEvent(ApplicationReadyEventevent) { finalSocketAcceptorsocketAcceptor = (connectionSetupPayload, sendingSocket) -> { finalAbstractRSocketabstractRSocket = newAbstractRSocket() { @Override public Flux<Payload>requestStream(Payload payload) { final String name = payload.getDataUtf8(); log.info("got request from consumer with payload: " + name); return notifications(name).map(DefaultPayload::create); } }; returnMono.just(abstractRSocket); }; finalTcpServerTransporttransport = TcpServerTransport.create(7000); RSocketFactory.receive().acceptor(socketAcceptor).transport(transport).start().block(); } } @Component class Consumer implements Ordered, ApplicationListener<ApplicationReadyEvent> { privatestaticfinal Logger log = LogManager.getLogger(Consumer.class); @Override publicintgetOrder() { returnOrdered.LOWEST_PRECEDENCE; } @Override publicvoidonApplicationEvent(ApplicationReadyEventevent) { finalTcpClientTransporttransport = TcpClientTransport.create(7000); RSocketFactory.connect().transport(transport).start() .flatMapMany(sender ->sender.requestStream(DefaultPayload.create("sravan"))).map(Payload::getDataUtf8) .subscribe(result ->log.info(" consumed new result " + result)); } }

    SocketAcceptor is one of the class provided by RSocket to accept the requests from Consumer. AbstractRsocket is the abstract implementation of RSocket. As we are implementing requestStream will implement the same in this class. And notifications is the class which produces stream of the string with 2 seconds delay. For each 2 seconds, the publisher will send the messages to Rsocket server, and consumer is connecting the same server which is running on port 7000, to consume the message. We used Order interface, so that Publisher will create RSocket server before Consumer connect to the RSocket server.

  2. Request/Channel:

    The channel provides bi-directional communication, messages will flow continuously from consumer to publisher then publisher to consumer and it is going on…

    Lets the update the ApplicationReadyEvent handler method with below:

    Publisher Event Handler method:

    publicvoidonApplicationEvent(ApplicationReadyEventevent) { finalSocketAcceptorsocketAcceptor = (connectionSetupPayload, sendingSocket) -> { finalAbstractRSocketabstractRSocket = newAbstractRSocket() { @Override public Flux<Payload>requestChannel(Publisher<Payload>payloads) { returnFlux.from(payloads).map(Payload::getDataUtf8) .doOnNext(str ->log.info("received " + str + " in " + this.getClass().getName())) .map(RequestResponse::reply).map(DefaultPayload::create); } }; returnMono.just(abstractRSocket); }; finalTcpServerTransporttransport = TcpServerTransport.create(7000); RSocketFactory.receive().acceptor(socketAcceptor).transport(transport).start().block(); }

    Consumer EventHandler method:

    @Override publicvoidonApplicationEvent(ApplicationReadyEventevent) { log.info("started " + this.getClass().getName()); finalTcpClientTransporttransport = TcpClientTransport.create(7000); RSocketFactory.connect().transport(transport).start() .flatMapMany(socket ->socket .requestChannel(Flux.interval(Duration.ofSeconds(2)).map(l ->DefaultPayload.create("ping"))) .map(payload ->payload.getDataUtf8()) .doOnNext(string ->log.info("Received " + string + " in class " + this.getClass().getName())) .take(10).doFinally(signal ->socket.dispose())) .then().subscribe(result ->log.info(" consumed new result " + result)); }

    And static reply method as follows:

    static String reply(String message) { if (message.equalsIgnoreCase("ping")) { return"pong"; } elseif (message.equalsIgnoreCase("pong")) { return"ping"; } else { thrownewIllegalArgumentException("input must either 'ping' or 'pong'"); } }

    For each 2 seconds consumer will send the “ping” to publisher and publisher will respond with “pong” message.

    We can observe the same in log:

    2019-02-06 19:39:45.829 INFO 21268 --- [actor-tcp-nio-6] org.sravan.rsocket.channel.Pong : received ping in org.sravan.rsocket.channel.Pong$1

    2019-02-06 19:39:45.832 INFO 21268 --- [actor-tcp-nio-4] org.sravan.rsocket.channel.Ping : Received pong in class org.sravan.rsocket.channel.Ping

    2019-02-06 19:39:47.807 INFO 21268 --- [actor-tcp-nio-6] org.sravan.rsocket.channel.Pong : received ping in org.sravan.rsocket.channel.Pong$1

    2019-02-06 19:39:47.812 INFO 21268 --- [actor-tcp-nio-4] org.sravan.rsocket.channel.Ping : Received pong in class org.sravan.rsocket.channel.Ping

    Request/Response: the consumer will request the publisher to send the message, and client will block the thread unless it receives the thread from publisher.

    Fire-and-Forget: the client will ping the server and it will not receive any response from publisher, if we want to do things independently with another thread on Publisher then we can use this method.

    Conclusion: RSocket is one of the protocols on top HTTP, for faster commutation in Asynchronous way, and it also provides different ways to communicate the data from consumer to publisher. It mainly used Reactive based applications.

    To discuss more on this blog please contact our Java Developers

Awesome clients we worked for

Client Testimonials

  • Fabio Durso

    We found a reliable and efficient partner in Aegis Infoways, ready to support our strategy of offshore software development. Aegis Infoways has particularly impressed by the speed of response to any request for the development of software applications and for their maintenance.

  • Filipe

    We did hire full time Java developers from Aegis Infoways, to help us to improve a time to market of a product. The software platform is based on Java & Extjs, and they are delivering the software on time and with success. We strongly recommend Aegis Infoways as Reliable Development partner.

  • Steve

    Powerful solutions are given by Aegis Infoways dedicated developers for my projects. They suggest solutions as per current market trend. Other than this, the team is always ready for any type of changes or update. That is the main reason that I would like to give my next project to them.

Copyright © 2016 - Aegis Infoways All rights reserved