Subject
. Чтобы ответить на вопрос, надо понимать, как работает
subscribeOn()
.subscribeOn()
, как и другие операторы RxJava, создает враппер вокруг исходного Observable
. Вызов метода subscribe()
идет вверх по rx-цепочке в текущем треде, пока не доходит до враппер-класса ObservableSubscribeOn
. После этого вызов subscribe()
делегируется на исходный Observable
, но тред меняется на заданный в Scheduler
оператора subscribeOn()
. Далее ситуация различается для
Observable
и Subject
.В случае обычного cold observable вызов
subscribe()
инициализирует рассылку элементов вниз по цепочке. Другими словами, начинаются вызовы onNext()
на подписчике, переданном в методе subscribe()
. При этом поток остается тот же, в котором вызван subscribe()
, а именно заданный оператором subscribeOn()
.В случае
Subject
(hot observable) вызов subscribe()
добавляет подписчика в массив и, в зависимости от вида Subject, рассылает или нет предыдущие элементы. Рассылка кэшированных элементов инициализируется в том же потоке, в котором вызван subscribe()
, в потоке шедулера subscribeOn()
.Когда же на
Subject
вызывается onNext()
после подписки, вызов идет вниз по rx-цепочке без изменения треда, до первого observeOn()
. Т.е. subscribeOn()
больше не имеет эффекта.Итог:
Для
PublishSubject
subscribeOn()
не меняет тред, на котором вызывается onNext()
;В случае
BehaviorSubject
, ReplaySubject
и UnicastSubject
, subscribeOn()
применяется только к элементам, отправленным до вызова subscribe()
.С
AsyncSubject
все совсем запутанно. Если subscribe()
вызван после onComplete()
, то onNext()
получает кэшированный элемент и вызов происходит в потоке шедулера subscribeOn()
. Если же subscribe()
вызывается до onComplete()
, то onNext()
подписчика получает элемент в том же потоке, в котором вызван asyncSubject.onNext()
, т.е. subsrcibeOn()
не имеет эффекта.Подробнее о том, как работает subscribeOn().
Еще подробнее.