RxEventBus

September 25, 2015

I’ve been playing around with Facebook’s Flux architecture and RxJava lately and I wanted an event bus to use as the Dispatcher. Here is what I ended up with:

public final class RxEventBus {
    private final Subject<Object, Object> bus = new SerializedSubject<>(PublishSubject.create());

    public void post(Object o) {
        bus.onNext(o);
    }

    public <T> Observable<T> events(Class<T> klass) {
        return events().ofType(klass);
    }

    public Observable<Object> events() {
        return bus.asObservable();
    }
}

Now we get all the RxJava goodness with our eventbus implementation. Delicious.

Usage

To subscribe to events of a certain class just pass the class in:

rxEventBus.events(Profile.class)
    .subscribe(profile -> doSomethingWithProfile(profile));

Because our event bus is built on RxJava we can take advantage of the composability provided by RxJava. This could allow us to do more complex tasks like calculate rolling averages (using Observable.window()) or combine events (Observable.combineLatest(), Observable.zip(), etc).

rxEventBus.events(Profile.class)
    .map(profile -> profile.name())
    .subscribe(name -> print(name));

Often it’s useful to be able to log what is going through the event bus, you can do this by subscribing to the events() Observable.

rxEventBus.events()
    .subscribe(event -> Log.i(TAG, event.toString());

See


Profile picture
Written by Angus Morton.
I builds things.