RxSwiftの基本となるObservableとHot/Cold変換、Sequenceの分岐を学ぶ

コマンドラインでSwiftを実行する方法が分かったので、せっかくだから簡単に実行できるものを用意してRxSwiftの基礎と、Hot/Cold変換、さらにSequenceの分岐に関する理解を深めるためのコードを書いた。

Swiftってコマンドラインで実行できるんすよ

swiftインタプリタが立ち上がったり、swift filenameで実行できるのは知っていたけど、swift packageコマンドでパッケージを生成してライブラリと組み合わせたりもできるらしいことが分かった。

実際に作るとこんな感じ。

$ mkdir RxSwiftExecutable
# cd RxSwiftExecutable
$ swift package init --type executable

Creating executable package: RxSwiftExecutable
Creating Package.swift
Creating README.md
Creating .gitignore
Creating Sources/
Creating Sources/RxSwiftExecutable/main.swift
Creating Tests/
Creating Tests/LinuxMain.swift
Creating Tests/RxSwiftExecutableTests/
Creating Tests/RxSwiftExecutableTests/RxSwiftExecutableTests.swift
Creating Tests/RxSwiftExecutableTests/XCTestManifests.swift

ディレクトリ名をパッケージ名としてコマンドライン実行に必要なコード群が生成される。

あとは、下記のように実行するだけ。

$ swift build
$ swift run

Hello, world!

サンプルコード書いた

リポジトリはこちら。

github.com

main.swiftに実行したいコードを記述するだけで良い。ただ実行するだけなら、特にfunc mainを実装したりする必要はない。

外部ライブラリを使う際は、package.swiftに手を入れる必要がある。

// swift-tools-version:5.1
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription

let package = Package(
    name: "RxSwiftExecutable",
    dependencies: [
        // Dependencies declare other packages that this package depends on.
        // .package(url: /* package url */, from: "1.0.0"),
          // ライブラリのURLとバージョンを明記する
          .package(url: "https://github.com/ReactiveX/RxSwift", from: "5.0.1"),
    ],
    targets: [
        // Targets are the basic building blocks of a package. A target can define a module or a test suite.
        // Targets can depend on other targets in this package, and on products in packages which this package depends on.
        .target(
            name: "RxSwiftExecutable",
            // パッケージ名を配列で与える。ここでいうパッケージ名は import の対象
            dependencies: ["RxSwift", "RxCocoa"]),
        .testTarget(
            name: "RxSwiftExecutableTests",
            dependencies: ["RxSwiftExecutable"]),
    ]
)

GitHubリポジトリURLと、パッケージ名を明記すればOK。

Observable

Observableを正確に説明するのは結構難しくて、そこそこ理解しているつもりだけどまだ知らない機能とかがたくさんあると感じている。

初めて扱う人は、とりあえず以下のことを理解しておけばいいと思う。

Observableは、Subscribeで変更を購読することができる

Observableは、名前の通り観察可能なオブジェクトとして存在している。 そして、その観察可能なオブジェクトに対してsubscribeというメソッドを使って、変更を受け取ることができる。

Observable(観察可能)なオブジェクトは、.subscribeで観察でき、観察する側のことをObserverと呼ぶ。

hogeObservable
  .subscribe(
    onNext: { value in print("next fired") },
    onComplete: { value in print("complete fired") },
    onError: { error in print("error fired, error:\(error)") })

Observableは、onNext, onComplete, onErrorのイベントを発火してくるオブジェクトと考えて良い。 そして、それぞれのイベントが発火した時、対応するクロージャが呼ばれる。

ちなみに、onNext以外のイベントが発生したあとは、そのObservableはそれ以降何のイベントも発行してこない。 理由は色々あるんだけど、最初は「そういうもの」として考えるのが良い。

Observableは、なんらかの型を1つ取る

let observable = Observable<String>()

上記の例では、このObservableはStringという型を取る。 型を取ると何が起こるかというと、onNextで呼ばれるクロージャの引数に、String型のなんらかの値が流れてくることを表す。

observable.subscribe(onNext: { str in
 print(str)
})

といったように、渡ってきた値を使ってなんらかの処理を行うことができる。

Subject

SubjectはObservableの派生で、「Observer側から、Subjectに値を流せる」という特徴を持つ。 なので、購読側から任意のタイミングで任意のイベントを発行できる。

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<Void>()

subject1.subscribe(onNext: { str in
  print("onNext called with \(str)")
})

subject2.subscribe(onNext: { _ in
  print("onNext called")
})

// acceptで発火できる
subject1.accept("hoge")

// Void型のSubjectにはacceptの引数に()を渡す
subject.accept(())

PublishSubject

上記ですでに登場しているが、PublishSubjectとは、初期値を持たないSubjectのこと。 初期値を持たないので、.subscribeした瞬間には何も起こらない。

let firstObservable = PublishSubject<String?>()

なお、このPublishSubjectはElementとしてString?を取る。?が付いているので、nilが流れてくる可能性があるObservableだ。

BehaviorSubject

BehaviorSubjectとは、初期値を持つSubject。それ以外はPublishSubjectと同じ。 初期値を持つので、subscribeした瞬間に初期値が流れてくる。

let secondObservable = BehaviorSubject<Int>(value: 100)

初期値はコンストラクタにvalue: valという形で与える。この例では100: Intが流れる。

ObservableのHot/Cold

Hot/Coldとは、Observableの性質のこと。 Observableの種類によって、Hot/Coldどちらかの性質のみを持つ。両方を同時に満たすことはない。(たぶん)

Hot

HotなObservableとは、.subscribeしているObserverがいなくても、値が流れ続けるObservableのこと。

Cold

ColdなObservableとは、.subscribeしているObserverが発生するまで、値が流れないObservableのこと。

って言われてもわからん

のでコード書いて動かして見るのが一番早いと思う。

まず、Hot Observableから。

// These are Hot Observables
let firstObservable = PublishRelay<String?>()
let secondObservable = BehaviorRelay<Int>(value: 100)

// Generate Cold Observables from Hot Observables
let coldFirstObservable = firstObservable
  .flatMap(Observable.from(optional:))
  .map { str -> String in
    print("in cold first observable")
    return str + " from cold first observable"
  }

このサンプルコードでは、まず最初にfirstObservablesecondObservalbeを定義している。これらはどちらもRelay。

まず、SubjectやRelayはHotなObservableとして動作する。 Hotであることの定義は、subscribeしているObserverがいなくても値が流れ続けることだった。 ただ、subscribeをしているObserverがいないため、値が流れているのかどうか、ちょっとわかりづらい。

次に、coldFirstObservableを定義している。 これは、firstObservableに対してflatMapmapといったoperatorをチェインしていることがわかる。

一部例外はあるが、Hot Observableに対してoperatorを繋げたものは、Cold Observableとして振る舞う。

ColdなObservableは、subscribeされない限り値が流れないので、このcoldFirstObservableは現時点ではflatMapmapの中身が評価されることはない。

もう少し簡単な例を用意しよう。 以下のようなコードを実行した場合、実際にmapに与えられたクロージャに到達していないことがわかる。

import RxSwift
import RxCocoa
import Foundation

let firstObservable = PublishRelay<String>()

let cold = firstObservable.map { str in
  print("got \(str) in cold observable")
}

firstObservable.subscribe(onNext: { str in
  print(str)
})

firstObservable.accept("hoge")

実行結果は以下の通り。

hoge

次に、Cold ObservableにObserverを追加してみる。

import RxSwift
import RxCocoa
import Foundation

let firstObservable = PublishRelay<String>()

let cold = firstObservable.map { str in
  print("got \(str) in cold observable")
}

firstObservable.subscribe(onNext: { str in
  print(str)
})

firstObservable.accept("hoge")

cold.subscribe(onNext: { _ in
  print("cold observable fired")
})

firstObservable.accept("fuga")

実行結果は以下の通り。

hoge
fuga
got fuga in cold observable
cold observable fired

実行結果から、fugafirstObservableに流れてきたときにcoldにも値が流れていることが確認できる。 hogeでは発火していないのは、値が流れた時点ではObserverが存在しなかったため。

このように、Hot ObservableをCold ObservableにすることをCold変換、逆に、Cold ObservableをHot ObservableにすることをHot変換と呼ぶ。(名称はあんまり重要じゃないけど)

自分も最初は「HotとかColdとか、意識する必要あるのか?」と思っていたけど、subscribeするタイミングによって挙動が変わり、バグに繋がったケースを経験してからHot/Coldを意識するようになった。

Hot / Coldの一歩先へ

さて、これでHot/Coldの一番重要な性質について確認することができた。 しかし、Hot/Coldにはまだいくつか特別な性質がある。

Sequenceの分岐

RxSwiftでは、Observableから流れてくるデータのことをStreamとかSequenceとか呼ぶことが多い。本記事ではSequenceと呼ぶことにする。

このSequenceは、先述のHot / Cold変換で説明したように、流れたり流れなかったりする性質を持つ。 さらに、「どのようにHot / Cold変換したか」でSequenceが増えることがある。

増えるってどういうこと?

Cold Observableに複数のsubscribeを登録すると、Sequenceが複製されてしまう。

以下のコードを見てみよう。

import RxSwift
import RxCocoa
import Foundation

let firstObservable = PublishRelay<String>()

let cold = firstObservable.map { str in
  print("got \(str) in cold observable")
}

cold.subscribe(onNext: { _ in
  print("cold observable fired 1")
})

cold.subscribe(onNext: { _ in
  print("cold observable fired 2")
})

firstObservable.accept("fuga")

これを実行すると、以下のような結果が得られる。

got fuga in cold observable
cold observable fired 1
got fuga in cold observable
cold observable fired 2

一度しかacceptしていないにも関わらず、Cold Observableのmapに渡しているクロージャが2度評価されてしまっている。

これは、Sequenceが複製されてしまっているために起こる。

上記のサンプルコードのHot / Cold変換のイメージは、

Hot --- Cold --- Hot
            \--- Hot

ではなく、実は、

Hot --- Cold --- Hot
Hot --- Cold --- Hot

といった風になる。

覚えるべきポイントは、Cold Observableは、複数のObserverを持てない。持たせようとすると、Sequenceが複製されてしまう。

じゃあ、Hot Observableはどうなの?

結論から書くと、Hot Observableは複数のObserverを持てる。

Hot --- Hot
   \--- Hot
    \-- Hot

なので、「Cold Observableに複数のObserverを割り当てたいけど、Sequenceの複製は防ぎたい」といった場合は、一度Cold ObservableをHotに変換してやる必要がある。

その時に活躍するオペレータが、.publish()

Connectable Observable

publish()は、対象のObservableをConnectable Observableに変換してくれる。

Connectable ObservableはHot Observalbeの派生と考えて良い。特徴として、Hotだけど、.connect()を呼ぶまで値を下流に流さずに貯めて、呼ばれた瞬間にそれまで貯めていた値を全て放流する

extension ObservableType {

    /**
        Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
    
        This operator is a specialization of `multicast` using a `PublishSubject`.
    
        - seealso: [publish operator on reactivex.io](http://reactivex.io/documentation/operators/publish.html)
    
        - returns: A connectable observable sequence that shares a single subscription to the underlying sequence.
        */
    public func publish() -> RxSwift.ConnectableObservable<Self.Element>
}

実際に定義を確認してみると、publish()ConnectableObservableのオブジェクトを返すことがわかる。

このpublish()connect()を使って、「Cold Observableに複数のObserverを割り当てたいけど、Sequenceの複製は防ぎたい」というのを実現してみよう。

import RxSwift
import RxCocoa
import Foundation

let firstObservable = BehaviorRelay<String>(value: "hoge")

let coldPublish = firstObservable.map { str in
  print("got \(str) in cold shared observable")
}.publish()

coldPublish.subscribe(onNext: { _ in
  print("cold sahred observable fired 1")
})

print("before connect")

coldPublish.connect()

print("after connect")

coldPublish.subscribe(onNext: { _ in
  print("cold shard observable fired 2")
})

firstObservable.accept("fuga")

実際にconnectが呼ばれるまで本当に発火していないのかを確かめるために、printを仕込んでおいた。

実行結果は以下の通り。

before connect
got hoge in cold shared observable
cold sahred observable fired 1
after connect
got fuga in cold shared observable
cold sahred observable fired 1
cold shard observable fired 2

まず、firstObservable: BehaviorRelay<String>(value: "hoge")なので、このObservableを直接subscribeした場合は、その瞬間に値が流れる。

今回の例では一度.mapを噛ませたCold Observable経由でsubscribeしているので、発火はしない。

そして、before connectのprintの後に、coldPublish.connect()が呼ばれ、この瞬間にhogeが流れてくる。

見てわかる通り、connect()のあとで追加されているObserverに対しては値が流れていない。これはconnect()が呼ばれた瞬間に値をすべて放流してしまうから。

なので、その後に流れた値fugaはちゃんと両方のObserverに届いており、なおかつCold Observableが持つmapは一度しか評価されていない。

今回のSequenceは以下のようになる。

Hot --- Cold --- Connectable --- Hot
                             \--- Hot

connect()のヒミツ

実は、connect()Disposableオブジェクトを返してくる。

public class ConnectableObservable<Element>
    : Observable<Element>
    , ConnectableObservableType {

    /**
     Connects the observable wrapper to its source. All subscribed observers will receive values from the underlying observable sequence as long as the connection is established.

     - returns: Disposable used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.
     */
    public func connect() -> Disposable {
        rxAbstractMethod()
    }
}

このDisposableオブジェクトは.connect()を呼んだObservable、つまり上記の例ではcoldPublishに紐づいている。 Disposableオブジェクトはdispose()メソッドを持ち、これを呼ぶと、subscribeを解除することができる。

問題は「どのsubscribeが解除されるのか?」だが、coldPublishが購読しているのはfirstObservable。 なので、firstObservableから値が流れてきてもcoldPublishはその変更を受け取らず、結果としてcoldPublishを購読しているObserverたちにも変更は通知されなくなる。

実際にdisposeしてみると以下のようになる。

import RxSwift
import RxCocoa
import Foundation

let firstObservable = BehaviorRelay<String>(value: "hoge")

let coldPublish = firstObservable.map { str in
  print("got \(str) in cold shared observable")
}.publish()

coldPublish.subscribe(onNext: { _ in
  print("cold sahred observable fired 1")
})

let disposable = coldPublish.connect()

coldPublish.subscribe(onNext: { _ in
  print("cold shard observable fired 2")
})

firstObservable.subscribe(onNext: { str in
  print("first observable fired, value is \(str)")
})

disposable.dispose()
firstObservable.accept("fuga")

実行結果は以下の通り。

got hoge in cold shared observable
cold sahred observable fired 1
first observable fired, value is hoge
first observable fired, value is fuga

coldPublish.dispose()したあとはcoldPublishを購読しているObserverに値は流れていないが、firstObservableを直接購読しているObserverには値が正しく流れていることがわかる。

さらに、Connectable Observableに対してrefCount()というオペレータを使うと、購読しているObserverがいなくなった瞬間にdispose()を自動で行ってくれるようになる。

ちなみに、coldObservable.publish().refCount()のショートハンドがcoldObservable.share()となる。

色々遠回りしたけど

Cold Observableを、Sequenceの複製なしに複数のObserverを持たせたい場合は、share()を使うと良い。

import RxSwift
import RxCocoa
import Foundation

let firstObservable = BehaviorRelay<String>(value: "hoge")

let coldShared = firstObservable.map { str in
  print("got \(str) in cold shared observable")
}.share()

coldShared.subscribe(onNext: { _ in
  print("cold sahred observable fired 1")
})

coldShared.subscribe(onNext: { _ in
  print("cold shard observable fired 2")
})

firstObservable.accept("fuga")
got hoge in cold shared observable
cold sahred observable fired 1
got fuga in cold shared observable
cold sahred observable fired 1
cold shard observable fired 2

おわりに

RxSwiftは強力なツールでコールバック地獄を回避する有効な手段だが、うまく書かないと思い通りに動かないことも多いのでHot / ColdやShareを意識しつつコードを書くのが良いと思う。

参考資料

https://www.slideshare.net/yukitakahashi3139241/hot-cold https://qiita.com/toRisouP/items/f6088963037bfda658d3