observeOn vs. subscribeOn

Cross-thread subscriptions is a question that seems to pop up again and again on the RxSwift Slack channel. The explanation is very simple so I thought it was a good idea to put it in a blog post so whenever needed I can link to it instead of typing everything again and again.

Observable subscriptions

The terminology in regard to subscribing and observing is imho a bit messy so let’s first take this out of the way (do not skip this section!).

Let’s see how an observable subscription works. We can split subscribing into 3 parts:

subscription schema

  1. First you define an Observable, in some cases you provide some code in a closure that performs work and emits elements to any observers. When you create the observable, this code is stored for future use but NOT executed immediately. If there are no observers yet - the observable just sits and waits without doing anything.

  2. When you model your subscription you use some operators like map, filter, etc to process the emitted elements. Adding operators still does not perform any work - you’re just creating a “more specialized” observable out of the original one.

  3. Only when you call the subscribe(...) method on an observable you “turn it on”. Calling subscribe(...) will actually execute the code you provide in the block in part 1 (above).

So in that sense there two main points to make here:

  • the “subscription code” is the code that gets called from your subscribe() and is located in Observable.create { ... }. This is the code that creates your subscription and produces elements.
  • the “observation code” is where you observe for emitted elements - this is the code you provide in onNext: { ... }, onCompleted: {...}, etc. This is where you do your observing.

subscription vs observing

Schedulers

There’s a bunch of predefined schedulers that come with RxSwift that cover all your needs most of the time. There is a short doc on that topic: schedulers at RxSwift repo.

In this article we’ll use two:

  • MainScheduler.instance which schedulers work on the main thread and
  • ConcurrentDispatchQueueScheduler which uses GCD to execute work on a given queue.

Subscribing and subscribeOn

So let’s look at the subscribeOn operator - it allows you to change the scheduler on which the subscription code will be executed.

By default the subscription code will run on the same thread as the code where you call subscribe() except if you change the context by using subscribeOn(...).

For example:

Observable<Int>.create { observer in
    observer.onNext(1)
    sleep(1)
    observer.onNext(2)
    return Disposables.create()
}
.subscribe(onNext: { el in
    print(Thread.isMainThread)
})

If you place this code in a viewDidLoad you will block the main thread because of the usage of sleep in the subscription code.

Your onNext code will print true because you always will stay on the main thread:

[main] subscribe() -> [main] create{ ... } -> [main] onNext { ... }

Now you can change the scheduler of the subscription by inserting a subscribeOn operator:

Observable<Int>.create { observer in
    observer.onNext(1)
    sleep(1)
    observer.onNext(2)
    return Disposables.create()
}
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { el in
    print(Thread.isMainThread)
})

This time you will switch threads while subscribing:

[main] subscribe() -> [background] create{ ... } -> [background] onNext { ... }

And the print in the onNext will output a false.

Observing and observeOn

Now let’s move to observing the elements of the sequence. This part concerns your observation code.

In our earlier example you switched subscription to a background queue because it does some blocking work. But what you in fact want is to run the code in onNext { .. } on the main thread so you can update the app’s UI.

This you can achieve by using the observeOn operator. By the way you can place observeOn and subscribeOn anywhere in your operator chain - the order doesn’t really matter.

RxSwift includes a shared scheduler that uses the main thread MainScheduler.instance so you can use it to observe elements easily like so:

Observable<Int>.create { observer in
    observer.onNext(1)
    sleep(1)
    observer.onNext(2)
    return Disposables.create()
}
.observeOn(MainScheduler.instance)
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { el in
    print(Thread.isMainThread)
})

This will execute the code like so:

[main] subscribe() -> [background] create{ ... } -> [main] onNext { ... }

This is a pattern which you will often use if you do a lot of asynchronous work so the early you get used to it - the better.

I hope this quick post brought some clarity into the naming and usage.

Where to go from here?

To learn more about schedulers, and cross-thread subscriptions check out the RxBook! The book is available at http://raywenderlich.com/store - this is where you can see all updates, discuss in the website forums, etc.

Hope that post was helpful, and if you want to get in touch you can find me here

Share this post:


If you'd like to learn how to create professional production apps with RxSwift, the best resource out there is the RxSwift book written by Florent Pillet, Junior Bontognali, Marin Todorov, & Scott Gardner.

It features 20+ chapters covering the basics, the Rx operators, and advanced topics like testing, error handling, and app architecture.

Available from Ray Wenderlich: » Learn more.
More Reading