Sunday, July 23, 2017

Reactor: Reactive Programming Java 8

One of the more interesting features in the upcoming release of Java 9 is the support for Reactive Programming in Java. At a high-level Reactive programming deals with asynchronous data streams. Reactive streams provides a standard for asynchronous stream processing. Java 9 supports Reactive programming implementing the Reactive Streams specification through the Flow API. But before the release of Java 9, there are ways to implement Reactive programming in Java 8. Reactor and RxJava are a couple of APIs to implement Reactive streams in Java 8. Reactor is used internally by the Spring Framework to it's own Reactive support in version 5. In this post, we will look into Reactor library.

Reactive Streams 

Reactive Streams API has two high-level interfaces, Publisher and Subscriber. A Publisher is the source of events, while a Subscriber is the destination for those events. A Subscriber subscribes to a Publisher. When a Subscriber subscribes to a Pulisher, the Publisher invokes the onSubscribe() method on the Subscriber. When the Subscriber is ready to start handling events, it signals this via a request to that Subscription using the request() method. Upon receiving this signal, the Publisher begins to invoke onNext() for each event. This continues until either completion of the stream onComplete() or an error onError(Throwable).

Reactor

While Reactive Streams defines the high-level interfaces, the actual implementation is not provided. The Reactor API provides implementations for these interfaces. The main Publishers provided by Reactor API are Flux and Mono. Flux is a publisher that can publish an unlimited number of events, while Mono can publish at most one event.

Flux

There are many ways to create a Flux. Flux provides a few static methods to create instances.
// Create empty Flux
  Flux flux = Flux.empty();
  
  // Create Flux with elements
  Flux flux = Flux.just("one", "two", "three", "four");
  
  // Create Flux from Iterable
  Flux flux = Flux.fromIterable(Arrays.asList("one", "two", "three", "four"));

Mono 

Following are a few ways to create a Mono
  // Create empty Mono
  Mono mono = Mono.empty();
  
  // Create mono from elements
  Mono mono = Mono.just("one");

Subscriber 

For this example, we will write a simple subscriber which prints the elements. To handle backpressure, this subscriber will read only one element. After processing each element, we will request one more element.
package subscribers;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.SignalType;

public class StringSubscriber extends BaseSubscriber {

 private Subscription subscription;
 @Override
 protected void hookOnSubscribe(Subscription subscription) {
  this.subscription = subscription;
  subscription.request(1);
 }

 @Override
 protected void hookOnNext(String value) {
  System.out.println(Thread.currentThread().getName() + " : " + value);
  // Request for one more event
  subscription.request(1);
 }

 @Override
 protected void hookOnComplete() {
  System.out.println("Complete");
 }

 @Override
 protected void hookOnError(Throwable throwable) {
  throwable.printStackTrace();
 }

 @Override
 protected void hookOnCancel() {
  System.out.println("Cancelled");
 }

 @Override
 protected void hookFinally(SignalType type) {
  System.out.println("Signal Type " + type.name());
 }
 

}
And here is the full code for creating and subscribing to events
package main;

import java.util.Arrays;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import subscribers.StringSubscriber;

public class ReactorTest {

 public static void main(String[] args) {
  
  // Flux from Elements
  Flux seq1 = Flux.just("one", "two", "three", "four");
  seq1.subscribe(new StringSubscriber());
  
  // Flux using generate.
  Flux seq2 = Flux.generate(() -> 1, (state, sink) -> {
   sink.next("x " + " = " + state);
   if (state == 3)
    sink.complete();
   return state + 1;
  });


  seq2.subscribe(new StringSubscriber());

  // Flux from List
  Flux seq3 = Flux.fromIterable(Arrays.asList(new String[] {"one", "two", "three", "four"}));
  seq3.subscribe(new StringSubscriber());
  
  
  // Creating events using Emitter
  EmitterProcessor seq4 = EmitterProcessor.create();

  seq4.subscribe(new StringSubscriber());
  seq4.onNext("1");
  seq4.onNext("2");

 }
}
Finally the gradlel build file used for this project
apply plugin:'application'
mainClassName = "main.ReactorTest"
applicationName = 'reactive-tests'

repositories {
    jcenter()
}

dependencies {
 compile group: 'io.projectreactor', name: 'reactor-core', version: '3.0.7.RELEASE'
}

No comments:

Post a Comment

Popular Posts