RxSwiftの基本となるObservableとHot/Cold変換、Sequenceの分岐を学ぶ
コマンドラインでSwiftを実行する方法が分かったので、せっかくだから簡単に実行できるものを用意してRxSwiftの基礎と、Hot/Cold変換、さらにSequenceの分岐に関する理解を深めるためのコードを書いた。
Swiftってコマンドラインで実行できるんすよ
swift
でインタプリタが立ち上がったり、swift filename
で実行できるのは知っていたけど、swift package
コマンドでパッケージを生成してライブラリと組み合わせたりもできるらしいことが分かった。
実際に作るとこんな感じ。
$ mkdir RxSwiftExecutable # cd RxSwiftExecutable $ swift package init --type executable Creating executable package: RxSwiftExecutable Creating Package.swift Creating README.md Creating .gitignore Creating Sources/ Creating Sources/RxSwiftExecutable/main.swift Creating Tests/ Creating Tests/LinuxMain.swift Creating Tests/RxSwiftExecutableTests/ Creating Tests/RxSwiftExecutableTests/RxSwiftExecutableTests.swift Creating Tests/RxSwiftExecutableTests/XCTestManifests.swift
ディレクトリ名をパッケージ名としてコマンドライン実行に必要なコード群が生成される。
あとは、下記のように実行するだけ。
$ swift build $ swift run Hello, world!
サンプルコード書いた
リポジトリはこちら。
main.swift
に実行したいコードを記述するだけで良い。ただ実行するだけなら、特にfunc main
を実装したりする必要はない。
外部ライブラリを使う際は、package.swift
に手を入れる必要がある。
// swift-tools-version:5.1 // The swift-tools-version declares the minimum version of Swift required to build this package. import PackageDescription let package = Package( name: "RxSwiftExecutable", dependencies: [ // Dependencies declare other packages that this package depends on. // .package(url: /* package url */, from: "1.0.0"), // ライブラリのURLとバージョンを明記する .package(url: "https://github.com/ReactiveX/RxSwift", from: "5.0.1"), ], targets: [ // Targets are the basic building blocks of a package. A target can define a module or a test suite. // Targets can depend on other targets in this package, and on products in packages which this package depends on. .target( name: "RxSwiftExecutable", // パッケージ名を配列で与える。ここでいうパッケージ名は import の対象 dependencies: ["RxSwift", "RxCocoa"]), .testTarget( name: "RxSwiftExecutableTests", dependencies: ["RxSwiftExecutable"]), ] )
GitHubのリポジトリURLと、パッケージ名を明記すればOK。
Observable
Observableを正確に説明するのは結構難しくて、そこそこ理解しているつもりだけどまだ知らない機能とかがたくさんあると感じている。
初めて扱う人は、とりあえず以下のことを理解しておけばいいと思う。
Observableは、Subscribeで変更を購読することができる
Observableは、名前の通り観察可能なオブジェクトとして存在している。
そして、その観察可能なオブジェクトに対してsubscribe
というメソッドを使って、変更を受け取ることができる。
Observable(観察可能)なオブジェクトは、.subscribe
で観察でき、観察する側のことをObserverと呼ぶ。
hogeObservable .subscribe( onNext: { value in print("next fired") }, onComplete: { value in print("complete fired") }, onError: { error in print("error fired, error:\(error)") })
Observableは、onNext, onComplete, onErrorのイベントを発火してくるオブジェクトと考えて良い。 そして、それぞれのイベントが発火した時、対応するクロージャが呼ばれる。
ちなみに、onNext
以外のイベントが発生したあとは、そのObservableはそれ以降何のイベントも発行してこない。
理由は色々あるんだけど、最初は「そういうもの」として考えるのが良い。
Observableは、なんらかの型を1つ取る
let observable = Observable<String>()
上記の例では、このObservableはStringという型を取る。
型を取ると何が起こるかというと、onNext
で呼ばれるクロージャの引数に、String型のなんらかの値が流れてくることを表す。
observable.subscribe(onNext: { str in print(str) })
といったように、渡ってきた値を使ってなんらかの処理を行うことができる。
Subject
SubjectはObservableの派生で、「Observer側から、Subjectに値を流せる」という特徴を持つ。 なので、購読側から任意のタイミングで任意のイベントを発行できる。
let subject1 = PublishSubject<String>() let subject2 = PublishSubject<Void>() subject1.subscribe(onNext: { str in print("onNext called with \(str)") }) subject2.subscribe(onNext: { _ in print("onNext called") }) // acceptで発火できる subject1.accept("hoge") // Void型のSubjectにはacceptの引数に()を渡す subject.accept(())
PublishSubject
上記ですでに登場しているが、PublishSubject
とは、初期値を持たないSubjectのこと。
初期値を持たないので、.subscribe
した瞬間には何も起こらない。
let firstObservable = PublishSubject<String?>()
なお、このPublishSubjectはElementとしてString?
を取る。?
が付いているので、nil
が流れてくる可能性があるObservableだ。
BehaviorSubject
BehaviorSubject
とは、初期値を持つSubject。それ以外はPublishSubject
と同じ。
初期値を持つので、subscribe
した瞬間に初期値が流れてくる。
let secondObservable = BehaviorSubject<Int>(value: 100)
初期値はコンストラクタにvalue: val
という形で与える。この例では100: Int
が流れる。
ObservableのHot/Cold
Hot/Coldとは、Observableの性質のこと。 Observableの種類によって、Hot/Coldどちらかの性質のみを持つ。両方を同時に満たすことはない。(たぶん)
Hot
HotなObservableとは、.subscribe
しているObserverがいなくても、値が流れ続けるObservableのこと。
Cold
ColdなObservableとは、.subscribe
しているObserverが発生するまで、値が流れないObservableのこと。
って言われてもわからん
のでコード書いて動かして見るのが一番早いと思う。
まず、Hot Observableから。
// These are Hot Observables let firstObservable = PublishRelay<String?>() let secondObservable = BehaviorRelay<Int>(value: 100) // Generate Cold Observables from Hot Observables let coldFirstObservable = firstObservable .flatMap(Observable.from(optional:)) .map { str -> String in print("in cold first observable") return str + " from cold first observable" }
このサンプルコードでは、まず最初にfirstObservable
とsecondObservalbe
を定義している。これらはどちらもRelay。
まず、SubjectやRelayはHotなObservableとして動作する。
Hotであることの定義は、subscribe
しているObserverがいなくても値が流れ続けることだった。
ただ、subscribe
をしているObserverがいないため、値が流れているのかどうか、ちょっとわかりづらい。
次に、coldFirstObservable
を定義している。
これは、firstObservable
に対してflatMap
やmap
といったoperatorをチェインしていることがわかる。
一部例外はあるが、Hot Observableに対してoperatorを繋げたものは、Cold Observableとして振る舞う。
ColdなObservableは、subscribe
されない限り値が流れないので、このcoldFirstObservable
は現時点ではflatMap
やmap
の中身が評価されることはない。
もう少し簡単な例を用意しよう。
以下のようなコードを実行した場合、実際にmap
に与えられたクロージャに到達していないことがわかる。
import RxSwift import RxCocoa import Foundation let firstObservable = PublishRelay<String>() let cold = firstObservable.map { str in print("got \(str) in cold observable") } firstObservable.subscribe(onNext: { str in print(str) }) firstObservable.accept("hoge")
実行結果は以下の通り。
hoge
次に、Cold ObservableにObserverを追加してみる。
import RxSwift import RxCocoa import Foundation let firstObservable = PublishRelay<String>() let cold = firstObservable.map { str in print("got \(str) in cold observable") } firstObservable.subscribe(onNext: { str in print(str) }) firstObservable.accept("hoge") cold.subscribe(onNext: { _ in print("cold observable fired") }) firstObservable.accept("fuga")
実行結果は以下の通り。
hoge
fuga
got fuga in cold observable
cold observable fired
実行結果から、fuga
がfirstObservable
に流れてきたときにcold
にも値が流れていることが確認できる。
hoge
では発火していないのは、値が流れた時点ではObserverが存在しなかったため。
このように、Hot ObservableをCold ObservableにすることをCold変換、逆に、Cold ObservableをHot ObservableにすることをHot変換と呼ぶ。(名称はあんまり重要じゃないけど)
自分も最初は「HotとかColdとか、意識する必要あるのか?」と思っていたけど、subscribe
するタイミングによって挙動が変わり、バグに繋がったケースを経験してからHot/Coldを意識するようになった。
Hot / Coldの一歩先へ
さて、これでHot/Coldの一番重要な性質について確認することができた。 しかし、Hot/Coldにはまだいくつか特別な性質がある。
Sequenceの分岐
RxSwiftでは、Observableから流れてくるデータのことをStream
とかSequence
とか呼ぶことが多い。本記事ではSequenceと呼ぶことにする。
このSequenceは、先述のHot / Cold変換で説明したように、流れたり流れなかったりする性質を持つ。 さらに、「どのようにHot / Cold変換したか」でSequenceが増えることがある。
増えるってどういうこと?
Cold Observableに複数のsubscribe
を登録すると、Sequenceが複製されてしまう。
以下のコードを見てみよう。
import RxSwift import RxCocoa import Foundation let firstObservable = PublishRelay<String>() let cold = firstObservable.map { str in print("got \(str) in cold observable") } cold.subscribe(onNext: { _ in print("cold observable fired 1") }) cold.subscribe(onNext: { _ in print("cold observable fired 2") }) firstObservable.accept("fuga")
これを実行すると、以下のような結果が得られる。
got fuga in cold observable cold observable fired 1 got fuga in cold observable cold observable fired 2
一度しかaccept
していないにも関わらず、Cold Observableのmap
に渡しているクロージャが2度評価されてしまっている。
これは、Sequenceが複製されてしまっているために起こる。
上記のサンプルコードのHot / Cold変換のイメージは、
Hot --- Cold --- Hot \--- Hot
ではなく、実は、
Hot --- Cold --- Hot Hot --- Cold --- Hot
といった風になる。
覚えるべきポイントは、Cold Observableは、複数のObserverを持てない。持たせようとすると、Sequenceが複製されてしまう。
じゃあ、Hot Observableはどうなの?
結論から書くと、Hot Observableは複数のObserverを持てる。
Hot --- Hot \--- Hot \-- Hot
なので、「Cold Observableに複数のObserverを割り当てたいけど、Sequenceの複製は防ぎたい」といった場合は、一度Cold ObservableをHotに変換してやる必要がある。
その時に活躍するオペレータが、.publish()
。
Connectable Observable
publish()
は、対象のObservableをConnectable Observable
に変換してくれる。
Connectable ObservableはHot Observalbeの派生と考えて良い。特徴として、Hotだけど、.connect()を呼ぶまで値を下流に流さずに貯めて、呼ばれた瞬間にそれまで貯めていた値を全て放流する
。
extension ObservableType { /** Returns a connectable observable sequence that shares a single subscription to the underlying sequence. This operator is a specialization of `multicast` using a `PublishSubject`. - seealso: [publish operator on reactivex.io](http://reactivex.io/documentation/operators/publish.html) - returns: A connectable observable sequence that shares a single subscription to the underlying sequence. */ public func publish() -> RxSwift.ConnectableObservable<Self.Element> }
実際に定義を確認してみると、publish()
はConnectableObservable
のオブジェクトを返すことがわかる。
このpublish()
とconnect()
を使って、「Cold Observableに複数のObserverを割り当てたいけど、Sequenceの複製は防ぎたい」というのを実現してみよう。
import RxSwift import RxCocoa import Foundation let firstObservable = BehaviorRelay<String>(value: "hoge") let coldPublish = firstObservable.map { str in print("got \(str) in cold shared observable") }.publish() coldPublish.subscribe(onNext: { _ in print("cold sahred observable fired 1") }) print("before connect") coldPublish.connect() print("after connect") coldPublish.subscribe(onNext: { _ in print("cold shard observable fired 2") }) firstObservable.accept("fuga")
実際にconnect
が呼ばれるまで本当に発火していないのかを確かめるために、print
を仕込んでおいた。
実行結果は以下の通り。
before connect got hoge in cold shared observable cold sahred observable fired 1 after connect got fuga in cold shared observable cold sahred observable fired 1 cold shard observable fired 2
まず、firstObservable: BehaviorRelay<String>(value: "hoge")
なので、このObservableを直接subscribe
した場合は、その瞬間に値が流れる。
今回の例では一度.map
を噛ませたCold Observable経由でsubscribe
しているので、発火はしない。
そして、before connect
のprintの後に、coldPublish.connect()
が呼ばれ、この瞬間にhoge
が流れてくる。
見てわかる通り、connect()
のあとで追加されているObserverに対しては値が流れていない。これはconnect()
が呼ばれた瞬間に値をすべて放流してしまうから。
なので、その後に流れた値fuga
はちゃんと両方のObserverに届いており、なおかつCold Observableが持つmap
は一度しか評価されていない。
今回のSequenceは以下のようになる。
Hot --- Cold --- Connectable --- Hot \--- Hot
connect()のヒミツ
実は、connect()
はDisposable
オブジェクトを返してくる。
public class ConnectableObservable<Element> : Observable<Element> , ConnectableObservableType { /** Connects the observable wrapper to its source. All subscribed observers will receive values from the underlying observable sequence as long as the connection is established. - returns: Disposable used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence. */ public func connect() -> Disposable { rxAbstractMethod() } }
このDisposableオブジェクトは.connect()
を呼んだObservable、つまり上記の例ではcoldPublish
に紐づいている。
Disposableオブジェクトはdispose()
メソッドを持ち、これを呼ぶと、subscribe
を解除することができる。
問題は「どのsubscribe
が解除されるのか?」だが、coldPublish
が購読しているのはfirstObservable
。
なので、firstObservable
から値が流れてきてもcoldPublish
はその変更を受け取らず、結果としてcoldPublish
を購読しているObserverたちにも変更は通知されなくなる。
実際にdispose
してみると以下のようになる。
import RxSwift import RxCocoa import Foundation let firstObservable = BehaviorRelay<String>(value: "hoge") let coldPublish = firstObservable.map { str in print("got \(str) in cold shared observable") }.publish() coldPublish.subscribe(onNext: { _ in print("cold sahred observable fired 1") }) let disposable = coldPublish.connect() coldPublish.subscribe(onNext: { _ in print("cold shard observable fired 2") }) firstObservable.subscribe(onNext: { str in print("first observable fired, value is \(str)") }) disposable.dispose() firstObservable.accept("fuga")
実行結果は以下の通り。
got hoge in cold shared observable cold sahred observable fired 1 first observable fired, value is hoge first observable fired, value is fuga
coldPublish.dispose()
したあとはcoldPublish
を購読しているObserverに値は流れていないが、firstObservable
を直接購読しているObserverには値が正しく流れていることがわかる。
さらに、Connectable Observableに対してrefCount()
というオペレータを使うと、購読しているObserverがいなくなった瞬間にdispose()
を自動で行ってくれるようになる。
ちなみに、coldObservable.publish().refCount()
のショートハンドがcoldObservable.share()
となる。
色々遠回りしたけど
Cold Observableを、Sequenceの複製なしに複数のObserverを持たせたい場合は、share()
を使うと良い。
import RxSwift import RxCocoa import Foundation let firstObservable = BehaviorRelay<String>(value: "hoge") let coldShared = firstObservable.map { str in print("got \(str) in cold shared observable") }.share() coldShared.subscribe(onNext: { _ in print("cold sahred observable fired 1") }) coldShared.subscribe(onNext: { _ in print("cold shard observable fired 2") }) firstObservable.accept("fuga")
got hoge in cold shared observable cold sahred observable fired 1 got fuga in cold shared observable cold sahred observable fired 1 cold shard observable fired 2
おわりに
RxSwiftは強力なツールでコールバック地獄を回避する有効な手段だが、うまく書かないと思い通りに動かないことも多いのでHot / ColdやShareを意識しつつコードを書くのが良いと思う。
参考資料
https://www.slideshare.net/yukitakahashi3139241/hot-cold https://qiita.com/toRisouP/items/f6088963037bfda658d3