Tag

RxJava2

Browsing

In this post, I will show you the best practices for Implementing an Event bus with RxJava. The RxBus that I’m going to demonstrate that very helpful for state propagation. So guys, let’s started.

1. Create an Android Project named RxBus Example

    //RxJava
    implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

2. Create a simple RX bus that emits the data to the subscribers using Observable

package com.rxbusexample

import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject

/*
 * Singleton instance
 * A simple rx bus that emits the data to the subscribers using Observable
 */
object RxBus {

    private val publisher = PublishSubject.create<Any>()

    fun publish(event: Any) {
        publisher.onNext(event)
    }

    // Listen should return an Observable and not the publisher
    // Using ofType we filter only events that match that class type
    fun <T> listen(eventType: Class<T>): Observable<T> = publisher.ofType(eventType)

}

3. Create a class named is RxBusEvent

package com.rxbusexample

class RxBusEvent {

    data class ProgressEvent(
        val showDialog: Boolean,
        val message: String? = null
    )

    data class LogOut(val logout: Boolean)

    data class MessageEvent(val message: String)
}

4. How to use this RxBus

package com.rxbusexample

import android.os.CountDownTimer
import android.os.Handler
import java.util.*
import java.util.concurrent.TimeUnit

/**
 * @timer is timer value in milliseconds (suppose 5 min = 5*60*1000)
 * @interval is a time interval of one count ideally it should be 1000
 */
class CountDownTimer(timer: Long, interval: Long) :
    CountDownTimer(timer, interval) {

    companion object {
        private const val SECONDS = 60
    }

    override fun onTick(millisUntilFinished: Long) {

        val textToShow = String.format(
            Locale.getDefault(), "%02d min: %02d sec",
            TimeUnit.MILLISECONDS.toMinutes(millisUntilFinished) % SECONDS,
            TimeUnit.MILLISECONDS.toSeconds(millisUntilFinished) % SECONDS
        )
        // Publish MessageEvents
        RxBus.publish(RxBusEvent.MessageEvent(textToShow))
    }

    override fun onFinish() {
        // Publish ProgressEvent for showing progressbar
        RxBus.publish(RxBusEvent.ProgressEvent(true, "Logging out ..."))
        Handler().postDelayed({
            // Publish ProgressEvent for dismissing progressbar
            RxBus.publish(RxBusEvent.ProgressEvent(false, "Log out Successfully"))
            RxBus.publish(RxBusEvent.LogOut(true))
        }, 3000)

    }

}

5. Let’s open the main activity layout file and add below

<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    tools:context=".MainActivity">


    <TextView
        android:id="@+id/textViewCounter"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginStart="16dp"
        android:layout_marginLeft="16dp"
        android:layout_marginTop="16dp"
        android:layout_marginEnd="16dp"
        android:layout_marginRight="16dp"
        android:layout_marginBottom="16dp"
        android:textSize="20sp"
        app:layout_constraintBottom_toBottomOf="parent"
        app:layout_constraintEnd_toEndOf="parent"
        app:layout_constraintStart_toStartOf="parent"
        app:layout_constraintTop_toTopOf="parent"
        app:layout_constraintVertical_bias="0.29"
        android:text="Start" />

    <TextView
        android:id="@+id/textViewMessage"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginTop="16dp"
        app:layout_constraintEnd_toEndOf="parent"
        app:layout_constraintStart_toStartOf="parent"
        app:layout_constraintTop_toBottomOf="@+id/progressBar"
        tools:text="TextView" />

    <ProgressBar
        android:id="@+id/progressBar"
        style="?android:attr/progressBarStyle"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginTop="60dp"
        android:visibility="gone"
        app:layout_constraintEnd_toEndOf="parent"
        app:layout_constraintStart_toStartOf="parent"
        app:layout_constraintTop_toBottomOf="@+id/textViewCounter" />

    <Button
        android:id="@+id/buttonStart"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginBottom="32dp"
        android:background="@color/colorAccent"
        android:text="Start"
        android:textColor="@color/colorWhite"
        app:layout_constraintBottom_toBottomOf="parent"
        app:layout_constraintEnd_toStartOf="@+id/buttonStop"
        app:layout_constraintHorizontal_bias="0.5"
        app:layout_constraintStart_toStartOf="parent" />

    <Button
        android:id="@+id/buttonStop"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginBottom="32dp"
        android:background="@color/colorAccent"
        android:text="Stop"
        android:textColor="@color/colorWhite"
        app:layout_constraintBottom_toBottomOf="parent"
        app:layout_constraintEnd_toEndOf="parent"
        app:layout_constraintHorizontal_bias="0.5"
        app:layout_constraintStart_toEndOf="@+id/buttonStart" />

</androidx.constraintlayout.widget.ConstraintLayout>

6. Finally, open the MainActivity and listen to the Observable with event type filter

package com.rxbusexample

import android.annotation.SuppressLint
import android.os.Bundle
import android.view.View.GONE
import android.view.View.VISIBLE
import android.widget.Toast
import androidx.appcompat.app.AppCompatActivity
import kotlinx.android.synthetic.main.activity_main.*

class MainActivity : AppCompatActivity() {

    companion object {
        const val INTERVAL: Long = 1000
        const val TIMER_TIME: Long = 1 * 60 * 1000
    }

    @SuppressLint("CheckResult")
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        val counter = CountDownTimer(TIMER_TIME, INTERVAL)

        // set click event on start button
        buttonStart.setOnClickListener { counter.start() }
        // set click event on stop button
        buttonStop.setOnClickListener {
            counter.cancel()
            textViewCounter.text = "Stop"
        }


        // Listen for MessageEvents only
        RxBus.listen(RxBusEvent.MessageEvent::class.java)
            .subscribe { textViewCounter.text = it.message }

        // Listen for ProgressEvent only
        RxBus.listen(RxBusEvent.ProgressEvent::class.java)
            .subscribe {
                textViewMessage.text = it.message
                progressBar.visibility = if (it.showDialog) VISIBLE else GONE
            }


        // Listen for LogOut only
        RxBus.listen(RxBusEvent.LogOut::class.java)
            .subscribe {
                Toast.makeText(this, "Logout Done !", Toast.LENGTH_LONG).show()
            }

        /*
          // Listen for String events only
          RxBus.listen(String::class.java).subscribe({
              println("Im a String event $it")
          })

          // Listen for Int events only
          RxBus.listen(Int::class.java).subscribe({
              println("Im an Int event $it")
          })*/

    }
}

Conclusion

In this android app tutorial, we learned Implementing event bus with rxjava. I hope it’s helpful for you, then help me by sharing this post with all your friends who learning android app development.

Get Solution Code

Communication between Fragment and Acitivity using RxJava

What is reactive programming?

The RxJava is a very interesting and new topic in Android development. But problem is that it is not easy to understand. Especially, Functional reactive programming is very hard to understand when you come from Object-Oriented Programming

Reactive is a programming paradigm and the oriented around data flow and propagation of data change. Any things can be stream variables such as user input, properties, and data structure etc. For instance RSS feed, and any click event. We can listen to these streams and reacting accordingly.

Now, what is ? Apps increasingly to complete better interactive experiences to the user. Reactive programming is phenomena that works and build a creating app that full-fill that user expectation

Now comes comes what is streams .?  

The stream is a sequence of ongoing event ordered in time. A stream can emit three different things a Value, an Error, and Completed signal.   We capture these emitted events by defining the specific function by executing each instance.

The listening each stream is called Subscriber. The functions we define are observers. The stream is subject and observers being observed. This is an observer and observable design pattern.

  • Observable => Emits items
  • Subscriber => Consume those items.

Why should use reactive programming

  • Reactive Programming raises the level of abstraction So we can focus on the interdependent of the code the define business logic rather then having to consistent falling large amount of implementation details.
  • The code in Reactive Programming in likely is more concise and simply the ability to chain asynchronous operations.   
  •  Reactive Programming is exposed explicit way to define how concurrent operation should be operates   
  • App have evolve  to more real time. Suppose one form is hold large data and you doing a single change in fields so data is automatically triggers and save to the backend. So same content is reflected the real time all other connected user
  • It help to reduce the maintenance of the state variables.

RxJava in Android

Rx stands for reactive extension. RxJava is a library for composing asynchronous and event-based programming by using observable sequence.

But what does actually means?

In Reactive programming, we received the continuous flow of data streams and we provide the operation apply to stream. The Source of data doesn’t really matter.

One of the most challenges of writing a robust Android app is a dynamic nature of changing input. In traditional programming, we have to set explicitly on variables them to be updated. If one value change then dependent value is not changed without adding line of code.

// init variables 
int a,b,c;
//init input 
a=1;
b=2;
//set the output value 
c=a+b ;
// Now update the dependent value     
b=5;
c=? // what should c be ..?

So if you want to reflect dependent value you should use callback methods. So basically C variable value is relay on callback. Most importantly Reactive Programming address these kinds of issue by providing a framework to describe output to reflect these changing input

How to work RxJava .?

RxJava is an extension library from DotNet enables android app to be built in this style. In this observer design pattern, an Observable emits items and subscribers consume those items. There is pattern in how items are emitted and Observable emits any number of items including 0 items then it terminates either by successfully completing or due to an error. Each subscriber has observable call.

It differs in one key way observable even don’t start emitting items until someone explicitly subscribes to them.

RxJava Basics

Now I will discuss building blocks of RxJava. Basically  Rx is made up of three key points

RX = OBSERVABLE + OBSERVER + SCHEDULERS

Observable

Observable are emitted a stream of data and event, It can be 0 or more. it terminates either by successfully completing or due to an error.

 Observable<String> listObservable = Observable.just("Maroon", "Red", "Orange", "Yellow", "Green", "White", "Black", "Blue", "Navy")

Observer

The observer is class that receives the events or data and acts upon it. In other words, Observers is consumed data stream emitted by observable.

Basically, Observer has 4 interface methods to manage different states of the Observable
  • onSubscribe(): Observers have to subscribe observable using subscribeOn() method to receiving emitted data by observable.
  • onNext(): This method invoked when the observable emits the data all the registered observer receives the data in onNext() callback.
  • onError(): This method invoked when the emission of data is not successfully completed. then an error is thrown from observable, the observer will receive it in onError().
  • onComplete(): This method is invoked when the Observable has successfully completed emitting all items.

Schedulers

As we know RxJava provides a simple way of asynchronous programming. That allows simplifying the asynchronously processing to manage the long-running operation.

In Android development, Thread management is a big nightmare for every developer. Rx programming provides also a simplified way of running different tasks in different threads. Schedulers are the component in Rx that tells observable and observers, on which thread they should run.

Schedulers had 2 methods which decide the (observer and observable) which thread should be run

  • observeOn() – Is tell the observers, on which thread you should observe
    • AndroidSchedulers.mainThread() – Observer will run on main UI thread.
  • subscribeOn() – tell the observable, on which thread you should run.
    • Schedulers.io() – will execute the code on IO thread.
    • Schedulers.newThread() – will create new background

In RxJava you can convert the stream in before received by the observers such as if an API call depends on the call of another API Last but not least, Rx programming reduces the need for state variables, which can be the source of errors.

You have to understand 3 basic steps in RxJava

  1.  Create observable  –  It emits the data
  2. Create an observer  – it consumes data 
  3. Schedulers – It manages concurrency 

How to implement in Android?

Let’s understand how particle implement that, Suppose you have a colorist and want to print each color on Logcat using RxJava. So simple fallow above 3 steps.

package com.wave.example;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;

public class MainActivity extends AppCompatActivity {
    private static final String TAG = "MainActivity";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observable<String> listObservable = Observable.  
                //Observable. This will emit the data
                just("Maroon", "Red", "Orange", "Yellow", "Green", "White", "Black", "Blue", "Navy");  
        //Operator

        listObservable.subscribeOn(Schedulers.newThread())  
                //Observable runs on new background thread.
                .observeOn(AndroidSchedulers.mainThread())  
                //Observer will run on main UI thread.
                .subscribe(new DisposableObserver<String>()
                        //Subscribe the observer
                {
                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: New data received:  " + s);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "Error received: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "All data emitted.");
                    }
                });
    }
}

Output is –

D/MainActivity: onNext: New data received:  Maroon
D/MainActivity: onNext: New data received:  Red
D/MainActivity: onNext: New data received:  Orange
D/MainActivity: onNext: New data received:  Yellow
D/MainActivity: onNext: New data received:  Green
D/MainActivity: onNext: New data received:  Black
D/MainActivity: onNext: New data received:  Blue
D/MainActivity: onNext: New data received:  Navy
D/MainActivity: All data emitted.

So by now, you should be able to understand, What is RxJava and why we need RxJava, why we need them and how we can implement them. In the next articles, we are going to learn how to use RxJava and it’s operators in detail. such as concat two APIs details

In this article, I’m going to explain the implementation of MVP architecture using Dagger 2, ButterKnife, Room Persistence, Rxjava 2, RxAndroid, Retrofit, logging and debugging. In this Android tutorial, we will build project that contains Architect Android Apps with MVP, Dagger, Retrofit & Rxjava.

I will share repository that contains all above library implementation following best practices. We create superior design in such way that it could be inherited and maximize the code reuse.

To order to build a clean code in the MVP architecture, some tool and frameworks are needed. These are Dagger2, Retrofit, RxJava, RoomPersisstense It will be good to understand how each of these libraries plays a specific part in the role to build better code. There almost industries standard of build a modern application for Android.

Prerequisite

As this article we are using many libraries, So you need to have knowledge of each one library. because in this sample app all library integrated with depends on one another. I recommend you got through MVP Architect Sample Application Tutorials Series.

MVP Architect Sample Application Tutorials Series

  1. MVP :- https://androidwave.com/android-mvp-architecture-for-beginners-demo-app/
  2. Dagger2 :- https://androidwave.com/dagger2-android-example/
  3. RxJava2:- https://androidwave.com/rxjava-rxandroid-tutorials/
  4. Retrofit:- https://square.github.io/retrofit/
  5. Room Persistence:- https://androidwave.com/working-with-room-persistence-library/
  6. Debugging and Logging:- https://androidwave.com/useful-tools-for-logging-debugging-in-android/
  7. Downloadable Font:- https://developer.android.com/guide/topics/ui/look-and-feel/downloadable-fonts
  8. Globally Error Handling – https://androidwave.com/retrofit-globally-error-handling/

The app has following packages:

  1. data: It contains all the data accessing and manipulating components.
  2. Room Persistence (Local Database )
  3. Networking stuff (REST APIs call)
  4. Shared Preferences.
  5. : Dependency providing classes using Dagger 2.
    1. Components
    2. Module  
  6. ui: View classes along with their corresponding Presenters.
  7. Views
  8. Custom Components
  9. Activity, fragment
  10. : Services for the application.
    1. All service and Job IntentService should be placed here.
  11. utils: Utility classes.
    1. All utility class write in this package
MVP architecture using Dagger 2

How to use this repo

Project Setup

Go to the root directory of workspace where you want place own project and just take a clone from GitHub repo using below command. If you face any issue while taking clone, Read this article

$ git clone https://github.com/droidwave/MVP-Architect-Android-Apps.git

After taking successful clone you have to do following changes

Brief Intro of Repo

In this repo, we have created a separate package for each module or functionality. For all user interface part is placed in UI package. Inside the UI package, we have created separate package for each small app module or activity. I would like to suggest just create a new package for new activity. For example for login module create login, for main activity create main, for profile create a profile package. such as below diagram.

Follow below step for create a new activity
  • Just create a new package name like profile. Now go to file menu and create a new activity named is ProfileActivity.
  • Now extends the BaseActivity instance of AppCompatActivity and override setUp() methods.
package com.androidwave.cleancode.ui.profile;


import android.os.Bundle;

import com.androidwave.cleancode.R;
import com.androidwave.cleancode.ui.base.BaseActivity;

public class ProfileActivity extends BaseActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_profile);
    }

    @Override
    protected void setUp() {

    }
}
  • Create a new interface name is ProfileMvpView which extends MvpView and declare all UI operation methods here such as updateProfile(UserProfile profile) for as per need
package com.androidwave.cleancode.ui.profile;

import com.androidwave.cleancode.data.network.pojo.UserProfile;
import com.androidwave.cleancode.ui.base.MvpView;

/**
 * Created on : Feb 17, 2019
 */
public interface ProfileMvpView extends MvpView {
    void updateProfile(UserProfile profile);
}
  • Furthermore, create a new interface with names PofileMvpPresenter<V extends ProfileMvpView> which extends MvpPresenter<V>. The view is here type of View. Here you declare all data related methods. For example network operation, fetching data from local DB etc.
package com.androidwave.cleancode.ui.profile;

import com.androidwave.cleancode.ui.base.MvpPresenter;

/**
 * Created on : Feb 17, 2019
 */
public interface ProfileMvpPresenter<V extends ProfileMvpView> extends MvpPresenter<V> {
    void onViewPrepared();
}
  • Create a new presenter with name ProfilePresenter which extends BasePresenter and implementing PofileMvpPresenter. After that override super constructor and annotated with @Inject annotations like below
  • Now open ProfileActivity and implement ProfileMvpView.
package com.androidwave.cleancode.ui.profile;

import com.androidwave.cleancode.data.DataManager;
import com.androidwave.cleancode.ui.base.BasePresenter;
import com.androidwave.cleancode.utils.rx.SchedulerProvider;

import io.reactivex.disposables.CompositeDisposable;

/**
 * Created on : Feb 17, 2019
 * Author     : AndroidWave
 */
public class ProfilePresenter<V extends ProfileMvpView> extends BasePresenter<V>
        implements ProfileMvpPresenter<V> {

    public ProfilePresenter(DataManager manager, SchedulerProvider schedulerProvider, CompositeDisposable compositeDisposable) {
        super(manager, schedulerProvider, compositeDisposable);
    }

    @Override
    public void onViewPrepared() {
        getMvpView().showLoading();
        getCompositeDisposable().add(getDataManager()
                .getUserProfile(String.valueOf(getDataManager().getUserId()))
                .subscribeOn(getSchedulerProvider().io())
                .observeOn(getSchedulerProvider().ui())
                .subscribe(response -> {
                    if (!isViewAttached()) {
                        return;
                    }
                    getMvpView().hideLoading();
                    /**
                     * Update view here
                     */
                    getMvpView().updateProfile(response.getData());
                }, error -> {
                    if (!isViewAttached()) {
                        return;
                    }
                    getMvpView().hideLoading();

                    /**
                     * manage all kind of error in single place
                     */
                    handleApiError(error);
                }));
    }
}
  • After that, you have to write a DI code for providing presenter here > base package => di =>module => ActivityModule
    @Provides
    @PerActivity
    ProfileMvpPresenter<ProfileMvpView> provideProfilePresenter(ProfilePresenter<ProfileMvpView> presenter) {
        return presenter;
    }
  • Finally, Open ProfileActivity and attached presenter with view
package com.androidwave.cleancode.ui.profile;


import android.os.Bundle;

import com.androidwave.cleancode.R;
import com.androidwave.cleancode.data.network.pojo.UserProfile;
import com.androidwave.cleancode.ui.base.BaseActivity;

import javax.inject.Inject;

public class ProfileActivity extends BaseActivity implements ProfileMvpView {
    @Inject
    ProfileMvpPresenter<ProfileMvpView> mPresenter;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_profile);
        getActivityComponent().inject(this);
        mPresenter.onAttach(ProfileActivity.this);
        setUp();
    }

    @Override
    protected void setUp() {

    }

    @Override
    public void updateProfile(UserProfile profile) {

    }
}
  • Open ActivityComponent from di =>components => ActivityComponent and add bellow code
  void inject(ProfileActivity profileActivity);

Finally, Your activity is ready to run, Almost same step you have to follow for fragment.

How to add RestApi

We have to follow some basic steps to add RestAPIs in this repos, as you seeing BaseDataManager class is responsible for handle all type data such as Rest APIs, PreferencesManager and local DB also.

  • Just navigate to base package=> data => network => NetworkService and add retrofit calling methods
package com.androidwave.cleancode.data.network;

import com.androidwave.cleancode.data.network.pojo.FeedItem;
import com.androidwave.cleancode.data.network.pojo.LoginRequest;
import com.androidwave.cleancode.data.network.pojo.UserProfile;
import com.androidwave.cleancode.data.network.pojo.WrapperResponse;

import java.util.List;

import io.reactivex.Single;
import retrofit2.http.Body;
import retrofit2.http.GET;
import retrofit2.http.POST;
import retrofit2.http.Path;

/**
 * Created on : Jan 19, 2019
 * Author     : AndroidWave
 */
public interface NetworkService {
    /**
     * @return Observable feed response
     */
    @GET("feed.json")
    Single<WrapperResponse<List<FeedItem>>> getFeedList();


    @POST("login")
    Single<WrapperResponse<UserProfile>> doLoginApiCall(@Body LoginRequest mRequest);

   // here is api that fetching user details    
    @GET("user/profile/{user_id}")
    Single<WrapperResponse<UserProfile>> getUserProfile(@Path("user_id") String userId);
}
  • Now open RestAPI manager and one methods for dealing getProfile API
package com.androidwave.cleancode.data.network;

import com.androidwave.cleancode.data.network.pojo.FeedItem;
import com.androidwave.cleancode.data.network.pojo.LoginRequest;
import com.androidwave.cleancode.data.network.pojo.UserProfile;
import com.androidwave.cleancode.data.network.pojo.WrapperResponse;

import java.util.List;

import io.reactivex.Single;

public interface RestApiHelper {

    Single<WrapperResponse<UserProfile>> doLoginApiCall(LoginRequest request);

    Single<WrapperResponse<List<FeedItem>>> getFeedList();

    // add this line of code
    Single<WrapperResponse<UserProfile>> getUserProfile(String userId);
}
  • Now override this method in all subclass. Open RestApiManager and add below line
    @Override
    public Single<WrapperResponse<UserProfile>> getUserProfile(String userId) {
        return mService.getUserProfile(userId);
    }
  • Finally, Open base BaseDataManager and add an unimplemented method like below
   @Override
    public Single<WrapperResponse<UserProfile>> getUserProfile(String userId) {
        return mApiHelper.getUserProfile(userId);
    }
Download Architect Android Apps with MVP, Dagger, Retrofit & Rxjava