JavaScript Required

We're sorry, but we doesn't work properly without JavaScript enabled.

Looking for an Expert Development Team? Take two weeks Trial! Try Now

RSocket Communication Protocol for Reactive Applications

Technology: RSocket is a binary, point-to-point communication protocol that support Reactive Streams. It provides an alternative to other protocols like HTTP.

It is bi-directional, multiplexed, message-based on Reactive back pressure.

It is mainly developed to work with Reactive style applications, which are non-blocking applications. The meaning of Reactive Pressure is Publisher will not send the message to subscriber, unless subscriber will send the message, indicating it is ready and how to many messages that it can handle, then publisher will send the messages to subscriber.

One of the drawbacks of HTTP Protocol is its one directional, as our data is modified there is no way to communicate to the client that the data is updated, at the client end we need to handle all retry logic, timeouts, circuit breakers etc. If the application is built using Reactive Architecture can easily scale, avoid the above HTTP failures.

Difference between RSocket and HTTP Protocol:

  1. Fire and Forget: When the response is not needed, such as non-critical event logging, sending email, etc.
  2. Request/Response: When we send the Request, and receive the response like HTTP, it also adds advantages over HTTP, through multiplexing and asynchronous.
  3. Request/Stream: Similar to Request/Response, returning the collection, the collection will stream for next results until it completes. It will be used in large data transfer between applications, like by giving the bank account number, it will respond with all bank account transactions.
  4. Channel: a bi-directional stream of messages allowing for arbitrary interaction models.

Rscocket will also supports resumption, using the previous connection id, if the stream in memory we can resume the consumption of the stream.

This is particularly useful for mobile‹–›server communication when network connections drop, switch, and reconnect frequently.

Maven Dependencies:

Create a Spring boot application from https://start.spring.io/ and select Lombok and ReactiveWeb as dependencies.

And manually add the below dependencies related to RSocket.

<dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-transport-netty</artifactId> <version>${rsocket.version}</version> </dependency>

And add the below property in properties section:

<rsocket.version>0.11.14</rsocket.version>

As we know Netty is the popular server for Reactive applications

  1. Request/Stream:

    Request/stream model, subscriber will send a request, and publisher will start sending the response for un-limited time.

    The requestStream method returns a Flux stream.

    Create the below classes in our application:

    @Component class Producer implements Ordered, ApplicationListener<ApplicationReadyEvent> { privatestaticfinal Logger log = LogManager.getLogger(Producer.class); @Override publicintgetOrder() { returnOrdered.HIGHEST_PRECEDENCE; } Flux<String> notifications(String name) { returnFlux.fromStream(Stream.generate(() ->"Hello " + name + "@" + Instant.now().toString())) .delayElements(Duration.ofSeconds(2)); } @Override publicvoidonApplicationEvent(ApplicationReadyEventevent) { finalSocketAcceptorsocketAcceptor = (connectionSetupPayload, sendingSocket) -> { finalAbstractRSocketabstractRSocket = newAbstractRSocket() { @Override public Flux<Payload> requestStream(Payload payload) { final String name = payload.getDataUtf8(); log.info("got request from consumer with payload: " + name); return notifications(name).map(DefaultPayload::create); } }; returnMono.just(abstractRSocket); }; finalTcpServerTransporttransport = TcpServerTransport.create(7000); RSocketFactory.receive().acceptor(socketAcceptor).transport(transport).start().block(); } } @Component class Consumer implements Ordered, ApplicationListener<ApplicationReadyEvent> { privatestaticfinal Logger log = LogManager.getLogger(Consumer.class); @Override publicintgetOrder() { returnOrdered.LOWEST_PRECEDENCE; } @Override publicvoidonApplicationEvent(ApplicationReadyEventevent) { finalTcpClientTransporttransport = TcpClientTransport.create(7000); RSocketFactory.connect().transport(transport).start() .flatMapMany(sender ->sender.requestStream(DefaultPayload.create("sravan"))).map(Payload::getDataUtf8) .subscribe(result ->log.info(" consumed new result " + result)); } }

    SocketAcceptor is one of the class provided by RSocket to accept the requests from Consumer. AbstractRsocket is the abstract implementation of RSocket. As we are implementing requestStream will implement the same in this class. And notifications is the class which produces stream of the string with 2 seconds delay. For each 2 seconds, the publisher will send the messages to Rsocket server, and consumer is connecting the same server which is running on port 7000, to consume the message. We used Order interface, so that Publisher will create RSocket server before Consumer connect to the RSocket server.

  2. Request/Channel:

    The channel provides bi-directional communication, messages will flow continuously from consumer to publisher then publisher to consumer and it is going on…

    Lets the update the ApplicationReadyEvent handler method with below:

    Publisher Event Handler method:

    publicvoidonApplicationEvent(ApplicationReadyEventevent) { finalSocketAcceptorsocketAcceptor = (connectionSetupPayload, sendingSocket) -> { finalAbstractRSocketabstractRSocket = newAbstractRSocket() { @Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { returnFlux.from(payloads).map(Payload::getDataUtf8) .doOnNext(str ->log.info("received " + str + " in " + this.getClass().getName())) .map(RequestResponse::reply).map(DefaultPayload::create); } }; returnMono.just(abstractRSocket); }; finalTcpServerTransporttransport = TcpServerTransport.create(7000); RSocketFactory.receive().acceptor(socketAcceptor).transport(transport).start().block(); }

    Consumer EventHandler method:

    @Override publicvoidonApplicationEvent(ApplicationReadyEventevent) { log.info("started " + this.getClass().getName()); finalTcpClientTransporttransport = TcpClientTransport.create(7000); RSocketFactory.connect().transport(transport).start() .flatMapMany(socket ->socket .requestChannel(Flux.interval(Duration.ofSeconds(2)).map(l ->DefaultPayload.create("ping"))) .map(payload ->payload.getDataUtf8()) .doOnNext(string ->log.info("Received " + string + " in class " + this.getClass().getName())) .take(10).doFinally(signal ->socket.dispose())) .then().subscribe(result ->log.info(" consumed new result " + result)); }

    And static reply method as follows:

    static String reply(String message) { if (message.equalsIgnoreCase("ping")) { return"pong"; } elseif (message.equalsIgnoreCase("pong")) { return"ping"; } else { thrownewIllegalArgumentException("input must either 'ping' or 'pong'"); } }

    For each 2 seconds consumer will send the “ping” to publisher and publisher will respond with “pong” message.

    We can observe the same in log:

    2019-02-06 19:39:45.829 INFO 21268 --- [actor-tcp-nio-6] org.sravan.rsocket.channel.Pong : received ping in org.sravan.rsocket.channel.Pong$1 2019-02-06 19:39:45.832 INFO 21268 --- [actor-tcp-nio-4] org.sravan.rsocket.channel.Ping : Received pong in class org.sravan.rsocket.channel.Ping 2019-02-06 19:39:47.807 INFO 21268 --- [actor-tcp-nio-6] org.sravan.rsocket.channel.Pong : received ping in org.sravan.rsocket.channel.Pong$1 2019-02-06 19:39:47.812 INFO 21268 --- [actor-tcp-nio-4] org.sravan.rsocket.channel.Ping : Received pong in class org.sravan.rsocket.channel.Ping

    Request/Response: the consumer will request the publisher to send the message, and client will block the thread unless it receives the thread from publisher.

    Fire-and-Forget: the client will ping the server and it will not receive any response from publisher, if we want to do things independently with another thread on Publisher then we can use this method.

Conclusion:

RSocket is one of the protocols on top HTTP, for faster commutation in Asynchronous way, and it also provides different ways to communicate the data from consumer to publisher. It mainly used Reactive based applications.

To discuss more on this blog please contact our Java Developers.