Reactive Programming in Java: How, Why, and Is It Worth Doing? Part II. Implementing and subscribing to an observer

Reactive Programming in Java: How, Why, and Is It Worth Doing? Part II. Implementing and subscribing to an observer

Implementing and subscribing to an observer

In Java 9, there is no implementation of reactive streams but only a specification. Yet, there are several libraries — implementations of the reactive approach. In this example, we use the RxJava library. We subscribe to a data stream and define several handlers, that is, methods that will be run at the start of stream processing (onSubscribe), upon receiving each following message (onNext), on the occurrence of errors (onError), and upon completion of the stream (onComplete):

implementing and subscribing to an observer.png

Look at the last string.

locations.map(String::length).filter(l -> l >= 5).subscribe(observer);

We use the map and filter operators. If you have worked with Java 8 streams, you certainly know map and filter. Here they work in the same way. The difference is that in reactive programming, those values could appear gradually. Every time a new value arrives, it goes through all transformations. Thus, String::length will replace strings to length in each line.

In this case, we get 5 (Minsk), 6 (Krakow), 6 (Moscow), 4 (Kiev), 5 (Sofia). Filter to leave only those that are over 5. We get a list of string lengths that are over 5 (Kiev is out). Subscribe to the final stream. Then the Observer is called to react to values in this final stream. With each next value, it will print the length:

public void onNext(Integer value) {

System.out.println("Length: " + value);

First, there appears Length 5, then Length 6. Once the stream is complete, onComplete will be called, and at the end, "Done.” will appear:

public void onComplete() {

System.out.println("Done.");

Not all streams can complete. But some can do that. For example, if we were reading something from a file, the stream will complete when the file ends.

If an error occurs, we can react to it:

public void onError(Throwable e) {

e.printStackTrace();

So we can react to various events in different ways: to the following values, stream completion, and errors.

 

Interested in learning how to program with Java or in upgrading your Java programming skills?

Check out our trainings