Introduction to Reactive Programming RxJava and RxAndroid
RxJava is out for quite some time and people are hearing about its greater capabilities, but a lot of them haven’t started yet. If you are one of them, you are late, but that’s ok. Few developers say there is no proper guide/document available or they fear to start something new. In this blog, I am aiming to write tutorials covering basics in RxJava and RxAndroid. We’ll start with the basic theoretical explanation of a few terms in reactive programming and later go through a few code samples.
What is Reactive Programming
Reactive Programming is event-based asynchronous programming. Everything is an asynchronous data stream, which can be observed and action will be taken place when it emits values. You can create data stream out of anything like variable changes, click events, HTTP calls, data storage, errors. When it says asynchronous, that means every code blocks of module runs on its own thread thus executing multiple code blocks simultaneously.
An advantage of the asynchronous approach is, every task runs on its own thread, all the task can run simultaneously and amount of time takes complete all the tasks is equivalent to the long task in the list. When we discuss mobile application, all tasks run on a background thread so you can achieve seamless user experience without blocking the main thread.
- Example like x = y + z; where the sum of y & z assigned to x. In reactive programming, when the y value change, the value of x also updates automatically without re-executing code block of x = y + z. This can be achieved by observing the value of y and/or z.
When you use Reactive Programming in your app, the way you design your architecture and the way you write code changes completely. It becomes more powerful when met with Clean Architecture like MVP, MVVM and other design patterns.
Reactive Extensions
Reactive Extensions (ReactiveX or RX) is a library that follows Reactive Programming principles like composing asynchronous and event-based programs by using observable. These libraries provide a set of interfaces and methods which helps developers to write clean and simpler code.
Reactive Extensions are available in multiple languages like Java (RxJava), Kotlin (RxKotlin), Swift (RxSwift), C++ (RxCpp), C# (Rx.NET) etc. We are specifically interested in RxJava and RxAndroid as Android is the focused area.
What is RxJava
RxJava is Java implementation of Reactive Extension. Basically, it is a library that composes asynchronous events by following Observer Pattern. You can compose asynchronous data stream from any thread and transform data and consumed it by an Observer on any thread. This library offers a wide range of operators like map, combine, merge, filter and lot more that can be applied onto data stream.
What is RxAndroid
RxAndroid is specific to Android having wrapper classes of RxJava. Schedulers are introduced in RxAndroid (AndroidSchedulers.mainThread()) which plays a bigger role in supporting multithreading in android apps. Schedulers basically decide the thread on which a particular code block runs whether on background or main thread. Apart from that everything used is from RxJava only.
There are a lot of Schedulers available like Schedulers.io() and AndroidSchedulers.mainThread() are extensively used in android development. Below is the list of schedulers.
- Schedulers.io() – Perform non-CPU-intensive operations like making network calls, reading disc/files, database operations etc., This maintains a pool of threads.
- AndroidSchedulers.mainThread() – Give access to android Main Thread / UI Thread. Generally, operations like updating UI / user interactions happen. It won’t support any intensive operations on this thread and if we do it makes the app glitchy or ANR dialog display.
- Schedulers.newThread() – A new thread will be created each time a task is scheduled. It’s generally suggested not to use scheduler unless there is a very long operation. The threads which are created via newThread() that won’t be reused.
- Schedulers.computation() – This can be used to perform CPU-intensive operations like processing huge data, bitmap processing etc., The number of threads created using this scheduler completely depends on number CPU cores available.
- Schedulers.single() – Executes all the tasks in sequential order. This can be used when synchronous execution is required.
- Schedulers.immediate() – Executes the task immediately in synchronous order by blocking the main thread.
- Schedulers.trampoline() – Executes the tasks in FIFO (First In – First Out) manner. Scheduled tasks will be executed sequentially by limiting the number of background threads to one.
- Schedulers.from() – Allows us to create a scheduler from an executor by limiting the number of threads to be created. When the thread pool is occupied at that time all tasks will be queued.
Now we have the basic concepts which needed. Let’s move to key concepts of RxJava that everyone should aware of it.
RxJava Basics
RxJava is all about two main key components: Observable and Observer. In addition to these, there are other components like Schedulers, Operators and Subscription.
Observable: Observable is a data stream which does some process and emits data.
Observer: Observer is the counterpart of Observable. It receives the data emitted by Observable.
Subscription: The relation between Observable and Observer is called a Subscription. Multiple Observers can be subscribed to a single Observable.
Operator / Transformation: Operators modifies the data which are emitted by Observable before an observer receives them.
Schedulers: Schedulers decides thread whether Observable should emit the data or which Observer should receive the data like background thread, main thread etc.
To get started, you need to add the RxJava and RxAndroid dependencies to your app’s build.gradle and sync.
// RxJava
implementation ‘io.reactivex.rxjava2:rxjava:2.2.6’
// RxAndroid
implementation ‘io.reactivex.rxjava2:rxandroid:2.1.0’
Basic Steps
- Create an Observable which emits data. We have created an Observable emits list of names. Here we will use just() operator to emit names.
Observable<String> namesObservable = Observable.just(“Laura”, “Dalton”, “Louis”, “Aiden”, “Aleeza”);
- Create an Observer that listen to Observable which provides the below interface methods to know the the state of Observable.
onSubscribe(): called when an Observer subscribes to Observable.
onNext(): called when Observable starts emitting the data.
onError(): called in case of any error.
onComplete(): called when an Observable completes the emission of all the items.
Observer<String> namesObserver = getNamesObserver();
private Observer<String> getNamesObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe”);
}
@Override
public void onNext(String s) {
Log.d(TAG, “Name: ” + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, “onError: ” + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, “All items are emitted!”);
}
};
}
- Observer subscribe to Observable, so that it can start receiving the data.
Here, you noticed two more methods, observeOn() and subscribeOn().
subscribeOn(Schedulers.io()): tells Observable to run task on a background/new thread.
observeOn(AndroidSchedulers.mainThread()): tells Observer to receive the data on android UI thread so that you can do UI related operations.
namesObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(namesObserver);
After running program, you can see below output in LogCat
onSubscribe
Name: Laura
Name: Dalton
Name: Louis
Name: Aiden
Name: Aleeza
All items are emitted!
That’s all, this is first RxJava program.
Below is complete code of the above example. Run in activity and check the output.
// observable
Observable<String> namesObservable = getNamesObservable();
// observer
Observer<String> namesObserver = getNamesObserver();
// observer subscribing to observable
namesObservable
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(namesObserver);
private Observer<String> getNamesObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe”);
}
@Override
public void onNext(String s) {
Log.d(TAG, “Name: ” + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, “onError: ” + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, “All items are emitted!”);
}
};
}
private Observable<String> getNamesObservable() {
return Observable.just(“Laura”, “Dalton”, “Louis”, “Aiden”, “Aleeza”);
}