Code your next android app using RxJava

RxJava is new hot topic in the world of Android Development. The only problem is that it is very difficult to understand. Especially, Functional reactive programming is very hard to understand when you come from Object Oriented Programming. So, I am developing a series of articles to help you understand the basic concept of Reactive Programming.

In the previous Part 1we get the basic concept of the reactive programming. If you did not read yet, I will encourage you to first go through Part 1 and get the basic idea about reactive programming.

As we discussed in Part 1, Rx is made up of three key component.


Let’s create each component one by one. But, first let’s start with integration.

Integrate RxAndroid in project

RxAndroid is basically an android specific wrapper that provides some functions explicit to android and those are not available in RxJava. So, if you want to integrate Rx into your java project only, you don’t need to implement RxAndroid library into your project.

Here is the gradle dependency of RxJava and RxAndroid.

The version of both the library may change, depends on when you are reading this article. You can find latest version from here.

Marble Diagram

Rx uses marble diagram to explain how any operator works. Marble diagrams are very convenient and easy to understand.

  • The line above the box indicates the raw data. This data needs to be emitted by the observable. Different types of objects are display with different shapes.
  • Box in the middle indicates operator. There are many other operators available, that basically controls when and how the observable should emit the data. We are going to look into those operators in future.
  • The line below the box indicates the data that is emitted by the observable. Observer in Rx receives this emitted data.


Let’s create Observable

As you know observable is nothing but a function that emits the data stream. Below is the observable that emits 1 to 5 one after another.

Here just() is called as an operator. It emits the values provided in arguments. (And that’s why they named as just.)


Sometimes we want to refine specific event only to be emitted by observable. Let’s say in our above example we only want to emit only odd numbers out of the observable. We can achieve this thing using another operator called filter(). As the name suggest filter operator filters items emitted by an Observable.


Creating observer

Observers consume the data stream emitted by the observable. Wheneverthe observable emits the data all the registered observer receives the data.

In RxJava there are three callbacks you are going to receive in an observer.

  • onNext() : This method will be called when there is any new data emitted by the observer. The object that is emitted by the observable can be found in argument parameters of this callback.
  • onError() : You will receive this callback whenever there is any error occurred on observable. (After all the world is not perfect.)
  • onComplete() : Whenever observable is done with emitting the data streams, you will receive this callback. This indicates that there is no more data to emit.

In many cases you don’t care about onCompleted() or onError(). So instead of using Observer<T> we can use a simpler class to define what to do during onNext() using Action1 class.

Here, onCall() is equivalent to onNext() in our first approach.

Manage concurrency using Scheduler

As you know scheduler in the reactive programming manages the concurrency.

In Android the most common operation when dealing with asynchronous tasks is to observe the task’s result or outcome on the main thread because you want to update the UI. Using vanilla Android, this would typically be accomplished with an AsyncTask. But with Rx you can achieve this by using the schedulers.

There are two methods, which controls thread management.

  • subscribeOn() : By using this method you can define on which thread the observable should run.
  • observeOn() : By using this method you can define on which thread the observer should run.

RxJava and RxAndroid library provides some predefined schedulers. Like, indicates the IO thread. While Schedulers.newThread() will create a new thread to run observer/observable. You can find other various types of schedulers here.

Finally, we will use subscribe() to subscribe the observer to receive the data issued by the observable. This will return Subscription object that holds the reference to that specific connection between observer and observable.

Let’s see the output of above program.


We can see from the output that only odd numbers were emitted by the observer. At the end, when all the data are emitted, onComplete() got execute.


If you want to unsubscribe the observer from observable, you can call unsubscribe.

In android this is essential that you call unsubscribe in onDesrtoy() of your activity/fragment to release the connection between observer and observable. Otherwise, it may call memory leak.

If you have multiple subscriptions in your class, then you can use CompositeSubscription to unregister all the subscriptions at once. Below is an example on how you can do that:

In the next part I am going to explain different operators used in RxJava. Meanwhile, If you liked the article, click the 💚 below so more people can see it!



What is Reactive Programming?


Nowadays everybody is talking about Reactive Programming and you’re curious in learning this new thing called Reactive Programming. Maybe you’ve seen it used a few places but you’re still a little confused and would like some clarifications.

In this article, we are going to learn the basic concepts of the Reactive Programming. Starting from the next article we are going to do some real programming and learn how to use RxJava in the Android application development.

So, first let’s understand what are the problem we are facing? Why do we require Reactive Programming? Because if there is no problem, then we don’t need a solution right??

Why do we need Asynchronous work?

The simple answer is we want to improve the user experience. We want to make our application more responsive. We want to deliver a smooth user experience to our users without freezing the main thread, slowing them down and we don’t want to provide the jenky performance to our users.

To keep the main thread free we need to do a lot of heavy and time-consuming work we want to do in the background. We also want to do heavy work and complex calculations on our servers as mobile devices are not very powerful to do the heavy lifting. So we need asynchronous work for network operations.

The evaluation matrix:


Let’s see what do we need from the library that handles all the asynchronous work. You can imagine below 4 points as the evaluation matrix for the asynchronous library.

  • Explicit execution: If we start the execution of a bunch of work on a new thread, we should be able to control it. If you are going to perform some background task, you gather the information and prepare them. As soon as you are ready, you can kick-off the background task.
  • Easy thread managementIn asynchronous work, thread management is the key. We often need to update the UI on the main thread from the background thread in the middle of the task or at the end of the task. For that, we need to pass our work from one thread (background thread) to another thread (here main thread). So you should be able to switch the thread easily and pass the work to another thread when needed.
  • Easily composable: Ideally, It would be great if we can create an asynchronous work and as we start spinning background thread, it just do it’s work without depending any other thread (especially on UI thread) and stays independent from the other thread until it finishes its job. But in the real world, we need to update the UI, make database changes and many more things that make threading interdependent. So the asynchronous library should be easily composable and provide less room for the error.
  • Minimum the side effects: While working with multiple threads, the other thread should experience minimum side effects from the other thread. That makes your code easily readable and understandable to a new person and it also makes error easily traceable.

What is Reactive Programming?

According to wikipedia:

Reactive programming is a programming paradigm oriented around data flows and the propagation of change. This means that it should be possible to express static or dynamic data flows with ease in the programming languages used, and that the underlying execution model will automatically propagate changes through the data flow.

In simple words, In Rx programming data flows emitted by one component and the underlying structure provided by the Rx libraries will propagate those changes to another component those are registered to receive those data changes. Long story short: Rx is made up of three key points.


We are going to discuss these points in detail one by one.

  • Observable: Observable are nothing but the data streams. Observable packs the data that can be passed around from one thread to another thread. They basically emit the data periodically or only once in their life cycle based on their configurations. There are various operators that can help observer to emit some specific data based on certain events, but we will look into them in upcoming parts. For now, you can think observers as suppliers. They process and supply the data to other components.
  • Observers: Observers consumes the data stream emitted by the observable. Observers subscribe to the observable using subscribeOn()method to receive the data emitted by the observable. Whenever the observable emits the data all the registered observer receives the data in onNext() callback. Here they can perform various operations like parsing the JSON response or updating the UI. If there is an error thrown from observable, the observer will receive it in onError().
  • Schedulers: Remember that Rx is for asynchronous programming and we need a thread management. There is where schedules come into the picture. Schedulers are the component in Rx that tells observable and observers, on which thread they should run. You can use observeOn()method to tell observers, on which thread you should observe. Also, you can use scheduleOn() to tell the observable, on which thread you should run. There are main default threads are provided in RxJava like Schedulers.newThread() will create new background that. will execute the code on IO thread.

3 simple steps to use Rx in your application


Let’s look into the basic example. This will explain 3 simple steps to use Reactive programming in your application.

Step-1 Create observable that emits the data:

Here database is an observable which emits the data. In our case, it emits the strings. just() is an operator. Which basically emits the data provided in the argument one by one. (We are going to look into the operators in detail in our upcoming articles. So, don’t worry about them.)

Step -2 Create observer that consumes data:

In above code snippet observer is an observer that consumes the data emitted by the database observable. It processes the data received and also handles error inside it.

Step-3 Manage concurrency :

At the last step, we define our schedulers that manage the concurrency. subscribeOn(Schedulers.newThread()) tells database observable to run on background thread. observeOn(AndroidSchedulers.mainThread()) tells observer to run on the main thread. This is basic code for reactive programming.

So by now you should be able to understand, why we need reactive programming, why we need them and how we can implement them. In the upcoming articles, we are going to learn how to use RxJava and it’s operators in detail.

What next? (Part 2)

Now, visit the next part to start some programming example in RxJava and how to use RxJava in your Android/Java project.

If you liked the article, click the 💚 below so more people can see it!

Twitter Follow.png