RxJava is new hot topic in the world of Android Development. The only problem is that it is very difficult to understand. Especially, Functional reactive programming is very hard to understand when you come from Object Oriented Programming. So, I am developing a series of articles to help you understand the basic concept of Reactive Programming.
In the previous Part 1, we get the basic concept of the reactive programming. If you did not read yet, I will encourage you to first go through Part 1 and get the basic idea about reactive programming.
As we discussed in Part 1, Rx is made up of three key component.
RX = OBSERVABLE + OBSERVER + SCHEDULERS
Let’s create each component one by one. But, first let’s start with integration.
Integrate RxAndroid in project
RxAndroid is basically an android specific wrapper that provides some functions explicit to android and those are not available in RxJava. So, if you want to integrate Rx into your java project only, you don’t need to implement RxAndroid library into your project.
Here is the gradle dependency of RxJava and RxAndroid.
The version of both the library may change, depends on when you are reading this article. You can find latest version from here.
Rx uses marble diagram to explain how any operator works. Marble diagrams are very convenient and easy to understand.
- The line above the box indicates the raw data. This data needs to be emitted by the observable. Different types of objects are display with different shapes.
- Box in the middle indicates operator. There are many other operators available, that basically controls when and how the observable should emit the data. We are going to look into those operators in future.
- The line below the box indicates the data that is emitted by the observable. Observer in Rx receives this emitted data.
Let’s create Observable
As you know observable is nothing but a function that emits the data stream. Below is the observable that emits 1 to 5 one after another.
Here just() is called as an operator. It emits the values provided in arguments. (And that’s why they named as just.)
Sometimes we want to refine specific event only to be emitted by observable. Let’s say in our above example we only want to emit only odd numbers out of the observable. We can achieve this thing using another operator called filter(). As the name suggest filter operator filters items emitted by an Observable.
Observers consume the data stream emitted by the observable. Wheneverthe observable emits the data all the registered observer receives the data.
In RxJava there are three callbacks you are going to receive in an observer.
- onNext() : This method will be called when there is any new data emitted by the observer. The object that is emitted by the observable can be found in argument parameters of this callback.
- onError() : You will receive this callback whenever there is any error occurred on observable. (After all the world is not perfect.)
- onComplete() : Whenever observable is done with emitting the data streams, you will receive this callback. This indicates that there is no more data to emit.
In many cases you don’t care about onCompleted() or onError(). So instead of using Observer<T> we can use a simpler class to define what to do during onNext() using Action1 class.
Here, onCall() is equivalent to onNext() in our first approach.
Manage concurrency using Scheduler
As you know scheduler in the reactive programming manages the concurrency.
In Android the most common operation when dealing with asynchronous tasks is to observe the task’s result or outcome on the main thread because you want to update the UI. Using vanilla Android, this would typically be accomplished with an AsyncTask. But with Rx you can achieve this by using the schedulers.
There are two methods, which controls thread management.
- subscribeOn() : By using this method you can define on which thread the observable should run.
- observeOn() : By using this method you can define on which thread the observer should run.
RxJava and RxAndroid library provides some predefined schedulers. Like, Schedulers.io() indicates the IO thread. While Schedulers.newThread() will create a new thread to run observer/observable. You can find other various types of schedulers here.
Finally, we will use subscribe() to subscribe the observer to receive the data issued by the observable. This will return Subscription object that holds the reference to that specific connection between observer and observable.
Let’s see the output of above program.
We can see from the output that only odd numbers were emitted by the observer. At the end, when all the data are emitted, onComplete() got execute.
If you want to unsubscribe the observer from observable, you can call unsubscribe.
In android this is essential that you call unsubscribe in onDesrtoy() of your activity/fragment to release the connection between observer and observable. Otherwise, it may call memory leak.
If you have multiple subscriptions in your class, then you can use CompositeSubscription to unregister all the subscriptions at once. Below is an example on how you can do that:
In the next part I am going to explain different operators used in RxJava. Meanwhile, If you liked the article, click the 💚 below so more people can see it!