Testing with RxBlocking, part 2

Previously, I wrote about the basics of testing with RxBlocking in Testing with RxBlocking, part1.

Since then, I presented 2 times at meetups about MVVM and RxBlocking and I thought I’d post few more examples in a broader context.

But first a question I got several times already when discussing RxBlocking.

When to use RxBlocking vs. RxTest?

RxTest is a great library for testing RxSwift code. It offers everything you need to test your Rx code and in most cases it’ll be everything you need.

RxTest is great in that it offers a class called TestScheduler - it allows you to feed a batch of input values and instantly get the output and compare it to what you expect it ot be. It’s a great tool and I already wrote about testing with RxTest before:

In edge cases in which you cannot control the source of asynchronisity for example you need to have a different approach.

RxBlocking is great when you need to wait until one element is emitted, record it, and then consume another one, etc.

A good example is code that uses RxRealm - RxRealm relies on Realm’s own notification system to produce change notifications and wraps those as observables. In this case you don’t know exactly when the notification will be emitted since that’s up to Realm. Furhter, changes to the database that are committed soone after each other might (or might not) be bundled together as a single notification by Realm depending on the available resources.

Therefore to test correctly RxRealm observables you need to wait for an element before triggering the next one to guarantee the change notifications won’t be bundled together.

RxRealm test suite

In this post I’ll look at the RxRealm test budle which I created some time ago with RxTest (not realizing there was the RxBlocking library to test async code).

The main pain points about testing RxRealm are:

  • completely asynchronous and emitting elements depends only on Realm
  • notifications (and therefore elements) might be bundled up
  • emitted elements might change over time (e.g. are mutable)

Disclaimer: Provided the code depends entirely on the Realm dependency the tests in the suite are more of integration tests than unit tests. That’s also one of the reasons why the suite requires RxBlocking.

Let’s look at how one test was implemented in RxTest. The code is testing whether Results collection emits an element when a change is written to the database via the rx.add() binding:

collection emits when objects added

Here’s the full code of that single test: I won’t go into much detail what is the code doing as I don’t recommend testing asynchronous code like that. I only include it here to show that testing edge cases like the RxRealm code base simly isn’t suitable for RxTest:

    func testRxAddObject() {
        let expectation = self.expectation(description: "Message1")
        let realm = realmInMemory(#function)
        let bag = DisposeBag()
        let events = [
            next(0, Message("1")),
        let rx_add: AnyObserver<Message> = realm.rx.add()
        let scheduler = TestScheduler(initialClock: 0)
        let observer = scheduler.createObserver(Array<Message>.self)
        let observable = scheduler.createHotObservable(events).asObservable()
        let messages$ = Observable.array(from: realm.objects(Message.self)).share(replay: 1)

            .disposed(by: bag)
            .subscribe(onNext: { messages in
                if messages.count == 1 {
            .disposed(by: bag)
            .disposed(by: bag)
        waitForExpectations(timeout: 1, handler: {error in
            XCTAssertNil(error, "Error: \(error!.localizedDescription)")
            XCTAssertTrue(observer.events.count > 0)
            XCTAssertEqual(observer.events.last!.time, 0)

The solution is a bit of a needless mind-bender so you can just skip through as the code is included only for comparisson. Here’s how I rewritten the same test with RxBlocking:

func testRxAddObjectWithSuccess() {
  let realm = realmInMemory(#function)
  let items = Observable.array(from: realm.objects(Message.self))
    .map { $0.map {$0.text} }

  // show all speakers
  DispatchQueue.main.async {
    _ = Observable.just(Message("1")).subscribe(realm.rx.add())

  let result = try! items.skip(1).toBlocking(timeout: 1).first()!
    XCTAssertEqual(result[0], "1")

The .toBlocking() API as explained in the previous part about testing with RxBlocking allows me to simply “wait” for an element to be emitted by the observable and then grab it and compare it to the value I expect. Bam!

Right tool for the job, eh?

Migrating RxRealm from RxTest to RxBlocking

So, how did the migration look like? Since all test with small exceptions follow 100% the same pattern:

  1. test setup
  2. trigger an element via an async task
  3. compared emitted element to expected value

I could realatively quickly churn through RxRealm’s test suite and migrate to RxBlocking. In fact, after the migration I didn’t need RxTest at all.

Here are some stats to illustrate better the change. RxRealm test suite with RxTest was 1570 lines of code. Then:

installing rxblocking

And the current version of the test suite is: 921 lines of code. I did also some general refactoring of the test code but using the right tool really helped save a lot of code in there.

So for the rest of this post, let me include some of the more interesting tests of the RxRealm test suite (which you can read in full at: https://github.com/RxSwiftCommunity/RxRealm/tree/master/Example/RxRealm_Tests)

Testing for an error asynchronously

In this test, I needed to simulate an error while writing objects to the Realm.rx.add() sink.

setup: The setup includes creating a realm configuration pointing to non-existing file and a recordedError variable to log the error.

trigger: I schedule an async task that will write a single object to rx.add() and in the error handler I simply store the error in recordedError.

test: In the test code I subscribe recordedError and wait for the next element - then compare it to the error I expect:

func testRxAddObjectWithError() {
    var conf = Realm.Configuration()
    conf.fileURL = URL(string: "/asdasdasdsad")!

    let recordedError = Variable<Error?>(nil)

    DispatchQueue.main.async {
        _ = Observable.just(Message("0"))
            .subscribe(Realm.rx.add(configuration: conf, update: true, onError: { value, error in
                recordedError.value = error

    let error = try! recordedError.asObservable().skip(1).toBlocking(timeout: 1).first()!
    XCTAssertEqual((error! as NSError).code, 3)

Testing a batch of changes

In this test I want to delete objects from a Realm via the Realm.rx.delete() sink. There are two separate delete() sinks - one as a static property and one as instance property. I’d like to test both from a single test case.

setup: The setup includes observing all persisted objects in the realm in order to record the emitted states.

trigger: I first write 4 objects to the database, then delete 2 via the instance sink, then the other 2 via the static sink property.

test: In the test code I subscribe to the collection of all objects to the database and then compare if the emitted collection states are 4 objects, then 2 objects, then 0:

func testRxDeleteItemsWithSuccess() {
    let realm = realmInMemory(#function)
    let items = Observable.array(from: realm.objects(UniqueObject.self).sorted(byKeyPath: "id"))
        .map { $0.map {$0.id} }

    let object1 = UniqueObject(1)
    let object2 = UniqueObject(2)
    let object3 = UniqueObject(3)
    let object4 = UniqueObject(4)

    try! realm.write {
        realm.add([object1, object2, object3, object4])

    // show all speakers
    DispatchQueue.main.async {
        _ = Observable.just([object1, object2]).subscribe(realm.rx.delete())
        _ = Observable.just([object3, object4]).subscribe(Realm.rx.delete())

    let result = try! items.take(3).toBlocking(timeout: 1).toArray()
    XCTAssertEqual(result[0], [1, 2, 3, 4])
    XCTAssertEqual(result[1], [3, 4])
    XCTAssertEqual(result[2], [])

Test that eventually all commits will be merged

Finally, let’s look into this one weird test case that needed a bit of a deviation from the pattern. RxRealm allows you (to a degree) to easily subscribe or observe on different schedulers.

For example you can create a bunch of Realm objects on the main thread, but write them to Realm on a background thread. Or create the objects in the background but decide to write them on the main thread to the database.

In this test I wanted to cover a number of different setups and run them all at once and test that all commits are written to the database and the final state of the database is what I expect it to be.

The pain point here is that some of the Realm notifications will be bundled and/or delivered in unpredictable order, depending on GCD’s queue that will write the changes. So, the only thing I can and want to test is the final state after all changes have been written to the database.

setup: The setup includes an observable that emits the list of ids of all objects in the database. This is what I’m gonna use at the end of check if all my objects were successfully written.

trigger: The test code includes 6 different async tasks - each creating or writing objects on different queues. All the tasks are scheduled to run immediately.

test: The test code is where it gets interesting. I create an observable called until which will emit once after it detects 6 objects being stored in the database.

Next I use until as the trigger to complete my other observable which emits the collection of object ids, like so:

items.takeUntil(until).toBlocking(timeout: 1)

Now, items will stop emitting whenever until detects there have been 6 objects stored and I can grab the last value and compare it to the list I expect to have in the Realm.

func testRxAddObjectsFromDifferentThreads() {
    let realm = realmInMemory(#function)
    let conf = realm.configuration

    let items = Observable.array(from: realm.objects(UniqueObject.self).sorted(byKeyPath: "id"))
        .map { $0.map {$0.id} }

    // write on current thread
    _ = Observable.just(UniqueObject(1)).subscribe(realm.rx.add())

    // write on background thread
    DispatchQueue.global(qos: .background).async {
        let realm = try! Realm(configuration: conf)
        _ = Observable.just(UniqueObject(2))

    // write on main scheduler
    DispatchQueue.global(qos: .background).async {
        _ = Observable.just(UniqueObject(3))
            .subscribe(Realm.rx.add(configuration: conf))

    // write on bg scheduler
    DispatchQueue.main.async {
        _ = Observable.just(UniqueObject(4))
            .observeOn( ConcurrentDispatchQueueScheduler(
                queue: DispatchQueue.global(qos: .background)))
            .subscribe(Realm.rx.add(configuration: conf))

    // subscribe on main, write in bg
    DispatchQueue.main.async {
        _ = Observable.just([UniqueObject(5), UniqueObject(6)])
            .observeOn( ConcurrentDispatchQueueScheduler(
                queue: DispatchQueue.global(qos: .background)))
            .subscribe( Realm.rx.add(configuration: conf) )

    let until = Observable.array(from: realm.objects(UniqueObject.self))
        .filter {$0.count == 6}
        .delay(0.1, scheduler: MainScheduler.instance)

    let result = try! items.takeUntil(until).toBlocking(timeout: 1).toArray()
    XCTAssertEqual(result.last!, [1, 2, 3, 4, 5, 6])

Where to go from here?

If you’ve missed any of the other test related posts here they are:

To learn more about RxSwift and testing check out the RxBook! The book is available at http://raywenderlich.com/store - this is where you can see all updates, discuss in the website forums, etc.

Hope that post was helpful, and if you want to get in touch you can find me here

Share this post:

If you'd like to learn how to create professional production apps with RxSwift, the best resource out there is the RxSwift book written by Florent Pillet, Junior Bontognali, Marin Todorov, & Scott Gardner.

It features 20+ chapters covering the basics, the Rx operators, and advanced topics like testing, error handling, and app architecture.

Available from Ray Wenderlich: » Learn more.