Flow 非同步資料流#

CH22 學的 suspend 函數和 async 都只能回傳 單一值。但實際開發中,很多資料來源會 隨時間持續產生多個值,例如:

  • 每秒更新一次的股票報價
  • 使用者在搜尋框持續輸入的文字
  • 資料庫資料被修改時發出的通知

這類「多個值的非同步序列」就是 Flow 要解決的問題。


本章三個主題#

主題解決什麼問題
Flow非同步產生並處理多個值的資料流
StateFlow持有「當前狀態」,新訂閱者可立即拿到最新值(適合 UI 狀態)
SharedFlow廣播一次性事件給多個接收者(適合導頁、通知)

1. Flow:非同步資料流#

為什麼不用 suspend 函數就好?#

假設要每隔 0.5 秒產生一個倒數數字,用 suspend 函數可以這樣寫:

1
2
3
4
5
6
7
8
import kotlinx.coroutines.*

suspend fun countDown() {
    for (i in 5 downTo 1) {
        println("倒數:$i")   // 印出的邏輯寫死在函數裡
        delay(500)
    }
}

這樣可以執行,但有個問題:「產生資料」和「如何使用資料」被寫死在一起。呼叫方無法對資料做任何加工——不能篩選、不能轉換、不能只取前幾個。

Flow 把這兩件事分開:函數只負責發出值(emit),呼叫方決定怎麼處理


建立 Flow#

使用 flow { } 建立器,搭配 emit() 發送值;呼叫方用 collect { } 接收。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun countDown(): Flow<Int> = flow {
    for (i in 5 downTo 1) {
        emit(i)      // 只管發值,不管接收方怎麼用
        delay(500)
    }
}

fun main() = runBlocking {
    countDown()
        .filter { it % 2 != 0 }   // 只要奇數
        .collect { value -> println("倒數:$value") }
    // 輸出:
    // 倒數:5
    // 倒數:3
    // 倒數:1
}

countDown() 回傳的是一個 Flow<Int> 物件,本身還沒開始執行。要等到呼叫 collect 時才真正啟動,中間可以接任意操作符。


Flow 是冷流(Cold Stream)#

Flow 只在有人 collect 時才開始執行,每次 collect 都是全新的執行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun numbers(): Flow<Int> = flow {
    println("Flow 開始執行")
    emit(1); emit(2); emit(3)
}

fun main() = runBlocking {
    val flow = numbers()

    println("第一次收集:")
    flow.collect { println(it) }

    println("第二次收集:")
    flow.collect { println(it) }
    // 輸出:
    // 第一次收集:
    // Flow 開始執行
    // 1 2 3
    // 第二次收集:
    // Flow 開始執行
    // 1 2 3
}

2. Flow 的操作符#

Flow 支援類似集合的操作符,形成資料處理管線。

mapfilter#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    (1..10).asFlow()
        .filter { it % 2 == 0 }    // 只取偶數
        .map { it * it }            // 平方
        .collect { println(it) }
    // 輸出:4 16 36 64 100
}

take:限制數量#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun infiniteNumbers(): Flow<Int> = flow {
    var i = 0
    while (true) emit(i++)
}

fun main() = runBlocking {
    infiniteNumbers()
        .take(5)
        .collect { println(it) }
    // 輸出:0 1 2 3 4
}

onEachonStartonCompletion:生命週期鉤子#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    (1..3).asFlow()
        .onStart { println("開始") }
        .onEach { println("處理:$it") }
        .onCompletion { println("完成") }
        .collect()
    // 輸出:
    // 開始
    // 處理:1
    // 處理:2
    // 處理:3
    // 完成
}

終端操作符#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val flow = (1..5).asFlow()

    println(flow.toList())      // 輸出:[1, 2, 3, 4, 5]
    println(flow.first())       // 輸出:1
    println(flow.count())       // 輸出:5

    val sum = flow.fold(0) { acc, value -> acc + value }
    println("總和:$sum")        // 輸出:總和:15
}

3. StateFlow:狀態管理#

跟 Flow 的差別#

Flow 是 冷流:沒有人 collect 就不執行,每次 collect 都重新跑一遍。這對 UI 狀態不實用——畫面重新組合時不應該重新執行資料來源,也需要立刻拿到「現在的值」是多少。

StateFlow熱流,有兩個關鍵特性:

  1. 持有當前值:任何時候都能用 .value 直接讀取,不需要 collect
  2. 新訂閱者立刻收到最新值:收集者加入時,馬上收到目前的狀態,不會錯過

MutableStateFlow 與 StateFlow#

使用上分成兩層:

  • MutableStateFlow:可讀可寫,只在類別內部使用,外部不應直接修改狀態
  • StateFlow:唯讀,暴露給外部觀察,透過 asStateFlow() 轉換
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class CounterViewModel {
    private val _count = MutableStateFlow(0)  // 內部可寫
    val count: StateFlow<Int> = _count.asStateFlow()  // 外部唯讀

    fun increment() { _count.value++ }
    fun decrement() { _count.value-- }
}

fun main() = runBlocking {
    val viewModel = CounterViewModel()

    // 用 launch 在背景持續收集狀態變化
    val job = launch {
        viewModel.count.collect { value ->
            println("計數:$value")
        }
    }

    delay(1)                // 讓 collector 先啟動,收到初始值 0
    viewModel.increment()   // _count 從 0 → 1
    delay(1)                // 讓 collector 收到 1
    viewModel.increment()   // _count 從 1 → 2
    delay(1)                // 讓 collector 收到 2
    viewModel.decrement()   // _count 從 2 → 1
    delay(100)
    job.cancel()
    // 輸出:
    // 計數:0   ← 訂閱時立刻收到初始值
    // 計數:1
    // 計數:2
    // 計數:1
}

collect 會持續監聽,每次 .value 改變就觸發一次。兩個注意事項:

  • 連續設定相同的值不觸發StateFlow 只在值真正改變時才通知
  • StateFlow 是衝突流(conflated):若 collector 來不及執行,中間值會被跳過,只收到最新值。上面範例用 delay(1) 讓主協程短暫暫停,collector 才有機會在每次改變後執行,看到完整的 0→1→2→1 過程

4. SharedFlow:事件廣播#

跟 StateFlow 的差別#

StateFlow 適合「狀態」——畫面現在顯示什麼、計數器目前是多少。這類資料有一個特性:新的值會取代舊的值,新訂閱者只需要知道最新狀態。

但有些事件不能被取代,例如「跳頁到登入畫面」或「顯示一次 Toast 通知」——這類事件發生一次就必須被所有訂閱者收到,不能被下一個事件覆蓋。這就是 SharedFlow 的用途。

StateFlowSharedFlow
持有當前值是(新訂閱者拿到最新值)否(訂閱後才能收到新事件)
適用場景UI 狀態(計數、載入中…)一次性事件(跳頁、通知…)
相同值重複發送不觸發每次都觸發

建立 SharedFlow#

同樣分兩層:MutableSharedFlow(內部可寫)和 SharedFlow(外部唯讀)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class EventBus {
    private val _events = MutableSharedFlow<String>()   // 內部可寫
    val events: SharedFlow<String> = _events.asSharedFlow()  // 外部唯讀

    suspend fun send(event: String) = _events.emit(event)
}

fun main() = runBlocking {
    val bus = EventBus()

    // 兩個訂閱者同時監聽同一個 SharedFlow
    val subscriber1 = launch {
        bus.events.collect { println("訂閱者 1 收到:$it") }
    }
    val subscriber2 = launch {
        bus.events.collect { println("訂閱者 2 收到:$it") }
    }

    delay(100)  // 等兩個訂閱者都啟動

    // emit() 是 suspend 函數,等所有訂閱者都收到才返回
    // 所以兩次 send() 之間不需要加 delay()
    bus.send("登入成功")
    bus.send("載入完成")
    delay(100)

    subscriber1.cancel()
    subscriber2.cancel()
    // 輸出:
    // 訂閱者 1 收到:登入成功
    // 訂閱者 2 收到:登入成功
    // 訂閱者 1 收到:載入完成
    // 訂閱者 2 收到:載入完成
}

emit() 會暫停,等兩個訂閱者都收到「登入成功」之後才返回,接著再執行第二個 send()。每個事件都保證送達所有訂閱者,不會有遺漏。


5. Flow vs. StateFlow vs. SharedFlow#

比較項目FlowStateFlowSharedFlow
冷流 / 熱流冷流熱流熱流
持有當前值
重播給新訂閱者全部重新執行最後一個值可設定重播數量
適用場景一次性資料轉換UI 狀態一次性事件

Reference#