Flow 非同步資料流#
CH22 學的 suspend 函數和 async 都只能回傳 單一值。但實際開發中,很多資料來源會 隨時間持續產生多個值,例如:
- 每秒更新一次的股票報價
- 使用者在搜尋框持續輸入的文字
- 資料庫資料被修改時發出的通知
這類「多個值的非同步序列」就是 Flow 要解決的問題。
本章三個主題#
| 主題 | 解決什麼問題 |
|---|
| Flow | 非同步產生並處理多個值的資料流 |
| StateFlow | 持有「當前狀態」,新訂閱者可立即拿到最新值(適合 UI 狀態) |
| SharedFlow | 廣播一次性事件給多個接收者(適合導頁、通知) |
1. Flow:非同步資料流#
為什麼不用 suspend 函數就好?#
假設要每隔 0.5 秒產生一個倒數數字,用 suspend 函數可以這樣寫:
1
2
3
4
5
6
7
8
| import kotlinx.coroutines.*
suspend fun countDown() {
for (i in 5 downTo 1) {
println("倒數:$i") // 印出的邏輯寫死在函數裡
delay(500)
}
}
|
這樣可以執行,但有個問題:「產生資料」和「如何使用資料」被寫死在一起。呼叫方無法對資料做任何加工——不能篩選、不能轉換、不能只取前幾個。
Flow 把這兩件事分開:函數只負責發出值(emit),呼叫方決定怎麼處理。
建立 Flow#
使用 flow { } 建立器,搭配 emit() 發送值;呼叫方用 collect { } 接收。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun countDown(): Flow<Int> = flow {
for (i in 5 downTo 1) {
emit(i) // 只管發值,不管接收方怎麼用
delay(500)
}
}
fun main() = runBlocking {
countDown()
.filter { it % 2 != 0 } // 只要奇數
.collect { value -> println("倒數:$value") }
// 輸出:
// 倒數:5
// 倒數:3
// 倒數:1
}
|
countDown() 回傳的是一個 Flow<Int> 物件,本身還沒開始執行。要等到呼叫 collect 時才真正啟動,中間可以接任意操作符。
Flow 是冷流(Cold Stream)#
Flow 只在有人 collect 時才開始執行,每次 collect 都是全新的執行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun numbers(): Flow<Int> = flow {
println("Flow 開始執行")
emit(1); emit(2); emit(3)
}
fun main() = runBlocking {
val flow = numbers()
println("第一次收集:")
flow.collect { println(it) }
println("第二次收集:")
flow.collect { println(it) }
// 輸出:
// 第一次收集:
// Flow 開始執行
// 1 2 3
// 第二次收集:
// Flow 開始執行
// 1 2 3
}
|
2. Flow 的操作符#
Flow 支援類似集合的操作符,形成資料處理管線。
map、filter#
1
2
3
4
5
6
7
8
9
10
| import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..10).asFlow()
.filter { it % 2 == 0 } // 只取偶數
.map { it * it } // 平方
.collect { println(it) }
// 輸出:4 16 36 64 100
}
|
take:限制數量#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun infiniteNumbers(): Flow<Int> = flow {
var i = 0
while (true) emit(i++)
}
fun main() = runBlocking {
infiniteNumbers()
.take(5)
.collect { println(it) }
// 輸出:0 1 2 3 4
}
|
onEach、onStart、onCompletion:生命週期鉤子#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..3).asFlow()
.onStart { println("開始") }
.onEach { println("處理:$it") }
.onCompletion { println("完成") }
.collect()
// 輸出:
// 開始
// 處理:1
// 處理:2
// 處理:3
// 完成
}
|
終端操作符#
1
2
3
4
5
6
7
8
9
10
11
12
13
| import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow = (1..5).asFlow()
println(flow.toList()) // 輸出:[1, 2, 3, 4, 5]
println(flow.first()) // 輸出:1
println(flow.count()) // 輸出:5
val sum = flow.fold(0) { acc, value -> acc + value }
println("總和:$sum") // 輸出:總和:15
}
|
3. StateFlow:狀態管理#
跟 Flow 的差別#
Flow 是 冷流:沒有人 collect 就不執行,每次 collect 都重新跑一遍。這對 UI 狀態不實用——畫面重新組合時不應該重新執行資料來源,也需要立刻拿到「現在的值」是多少。
StateFlow 是 熱流,有兩個關鍵特性:
- 持有當前值:任何時候都能用
.value 直接讀取,不需要 collect - 新訂閱者立刻收到最新值:收集者加入時,馬上收到目前的狀態,不會錯過
MutableStateFlow 與 StateFlow#
使用上分成兩層:
MutableStateFlow:可讀可寫,只在類別內部使用,外部不應直接修改狀態StateFlow:唯讀,暴露給外部觀察,透過 asStateFlow() 轉換
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class CounterViewModel {
private val _count = MutableStateFlow(0) // 內部可寫
val count: StateFlow<Int> = _count.asStateFlow() // 外部唯讀
fun increment() { _count.value++ }
fun decrement() { _count.value-- }
}
fun main() = runBlocking {
val viewModel = CounterViewModel()
// 用 launch 在背景持續收集狀態變化
val job = launch {
viewModel.count.collect { value ->
println("計數:$value")
}
}
delay(1) // 讓 collector 先啟動,收到初始值 0
viewModel.increment() // _count 從 0 → 1
delay(1) // 讓 collector 收到 1
viewModel.increment() // _count 從 1 → 2
delay(1) // 讓 collector 收到 2
viewModel.decrement() // _count 從 2 → 1
delay(100)
job.cancel()
// 輸出:
// 計數:0 ← 訂閱時立刻收到初始值
// 計數:1
// 計數:2
// 計數:1
}
|
collect 會持續監聽,每次 .value 改變就觸發一次。兩個注意事項:
- 連續設定相同的值不觸發:
StateFlow 只在值真正改變時才通知 - StateFlow 是衝突流(conflated):若 collector 來不及執行,中間值會被跳過,只收到最新值。上面範例用
delay(1) 讓主協程短暫暫停,collector 才有機會在每次改變後執行,看到完整的 0→1→2→1 過程
4. SharedFlow:事件廣播#
跟 StateFlow 的差別#
StateFlow 適合「狀態」——畫面現在顯示什麼、計數器目前是多少。這類資料有一個特性:新的值會取代舊的值,新訂閱者只需要知道最新狀態。
但有些事件不能被取代,例如「跳頁到登入畫面」或「顯示一次 Toast 通知」——這類事件發生一次就必須被所有訂閱者收到,不能被下一個事件覆蓋。這就是 SharedFlow 的用途。
| StateFlow | SharedFlow |
|---|
| 持有當前值 | 是(新訂閱者拿到最新值) | 否(訂閱後才能收到新事件) |
| 適用場景 | UI 狀態(計數、載入中…) | 一次性事件(跳頁、通知…) |
| 相同值重複發送 | 不觸發 | 每次都觸發 |
建立 SharedFlow#
同樣分兩層:MutableSharedFlow(內部可寫)和 SharedFlow(外部唯讀)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class EventBus {
private val _events = MutableSharedFlow<String>() // 內部可寫
val events: SharedFlow<String> = _events.asSharedFlow() // 外部唯讀
suspend fun send(event: String) = _events.emit(event)
}
fun main() = runBlocking {
val bus = EventBus()
// 兩個訂閱者同時監聽同一個 SharedFlow
val subscriber1 = launch {
bus.events.collect { println("訂閱者 1 收到:$it") }
}
val subscriber2 = launch {
bus.events.collect { println("訂閱者 2 收到:$it") }
}
delay(100) // 等兩個訂閱者都啟動
// emit() 是 suspend 函數,等所有訂閱者都收到才返回
// 所以兩次 send() 之間不需要加 delay()
bus.send("登入成功")
bus.send("載入完成")
delay(100)
subscriber1.cancel()
subscriber2.cancel()
// 輸出:
// 訂閱者 1 收到:登入成功
// 訂閱者 2 收到:登入成功
// 訂閱者 1 收到:載入完成
// 訂閱者 2 收到:載入完成
}
|
emit() 會暫停,等兩個訂閱者都收到「登入成功」之後才返回,接著再執行第二個 send()。每個事件都保證送達所有訂閱者,不會有遺漏。
5. Flow vs. StateFlow vs. SharedFlow#
| 比較項目 | Flow | StateFlow | SharedFlow |
|---|
| 冷流 / 熱流 | 冷流 | 熱流 | 熱流 |
| 持有當前值 | 否 | 是 | 否 |
| 重播給新訂閱者 | 全部重新執行 | 最後一個值 | 可設定重播數量 |
| 適用場景 | 一次性資料轉換 | UI 狀態 | 一次性事件 |
Reference#