Kotlin - Flow

What is Flow?

Flow 是用來處理非同步的資料流的一種方式,它會按照發射 (emit) 的順序來執行。

An asynchronous data stream that sequentially emits values and completes normally or with an exception.

將資料透過 Flow 的方式發送,在資料接收之前,這筆資料都不會被執行、運算,我們甚至可以在執行之前透過一些函式來將這些資料轉化成我們所希望的樣子。

跟 Channel 不太一樣的地方是,Channel 一次取出一個值,而 Flow 取出的是一個流(Stream)。

換句話說,使用 Channel 時,我們必須要呼叫多次的 receive() 來接收 send() 傳送出來的值,而 Flow 只需要使用 collection{} 就可以處理所有在這個資料流。

Simple Flow

Kotlin 提供多種方式建立 Flow ,在這邊我們使用 flow{ ... } 來建立。

fun flow1(): Flow<Int> = flow {
      println("Flow started")
      repeat(10){
        delay(100)
        emit(it)
    }
}

我們可以發現 flow{ ... } 建立出來的函式不是一個 suspend 函式,原因就是 Flow 是等到我們接收的時候才會去執行,所以接收的函式才會是一個 suspend 函式。

取出資料的範例如下:

fun main() = runBlocking {
    val flow = flow1()
    flow.collect { value -> println(value)}
}
Flow started
0
1
2
3
4
5
6
7
8
9

Flow is Cold

如果 Flow 沒有被 Collect 就不會被執行

所以看以下例子,雖然以順序來說是 nums() 先被呼叫到,但因為還沒呼叫到 collect() 所以會先輸出 Hello World 才會輸出 Start Flow

suspend fun nums(): Flow<Int> = flow {
    println("Start Flow")
    for (i in 1..5) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    var n: Flow<Int> = nums()
    println("Hello World")
    n.collect {
        println(it)
    }
}

/*
	Hello World
	Start Flow
	1
	2
	3
	4
	5
*/

Emit 時才會呼叫 Collect

看下面的例子,可以發現會先看到 emit() 上面的 print 被執行才會執行到呼叫 collect 時船進去的 function

suspend fun nums(): Flow<Int> = flow {
    println("Start Flow")
    for (i in 1..5) {
        delay(100)
        println("emit $i")
        emit(i)
    }
}

fun main() = runBlocking {
    var n: Flow<Int> = nums()
    println("Hello World")
    n.collect {
        println(it)
    }
}

/*
	Hello World
	Start Flow
	emit 1
	1
	emit 2
	2
	emit 3
	3
	emit 4
	4
	emit 5
	5
*/

Cold Flow

私人管線,只有一個人可以從管線中拿到東西,需要資料的人都會取得新的 instance ,只有這個 flow 本身可以 emit 資料,而 flow { } block 內的事情做完,這條 flow 也會終止。

// 每個 collector 拿到的 flow 都會是不同的 instance
fun getFlow(): Flow<T> = flow {
    emit(1)
    delay(100)
    emit(2)
}

getFlow.collect() // collector A
getFlow.collect() // collector B

Hot Flow

公有管線,被創建出來之後就會一直保留在記憶體中,只要能取的 reference ,就可以從中訂閱資料,也可以向 Mutable 的 instance emit 資料。不像是 cold flow 事情做完就會結束,有句描述是 “Hot flow never ends”

private val _sharedFlow = MutableSharedFlow<T>() // 可以 emit data
val sharedFlow = _sharedFlow.asSharedFlow() // 開放給別人訂閱

// 訂閱者都會訂閱同一個 SharedFlow
sharedFlow.collect()

// 生產者可以對 MutableSharedFlow emit 資料
_sharedFlow.emit(T)

SharedFlow

SharedFlow 當成是一條專門運送材料的水管,只要接上這條水管的工人 ( subscriber ) ,都能夠拿到材料 ( data ) 。工人透過 .collect() 接上水管,從水管收集材料,並且在收到材料之後做任何想做的事情

SharedFlow 有 replayextraBufferCapacity 以及 onBufferOverflow 等設定,讓資料分享端能夠依需求情境,彈性設定資料分享行為。

public fun <T> MutableSharedFlow(
  replay: Int = 0, 
  extraBufferCapacity: Int = 0, 
  onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND 
): MutableSharedFlow<T>
  • replay:新訂閱者可以拿到多少舊的資料?
  • extraBufferCapacity:有多少額外容量,來放置未處理的新資料?
  • onBufferOverflow:管線實際容量是 replay + extraBufferCapacity。容量爆掉之後,要如何處理裝不下的資料? 可以把機制設定為 SUSPEND 、DROP_LATEST 或 DROP_OLDEST 。

StateFlow

StateFlow 是 SharedFlow 的客製版,其特性又與 LiveData 相似,官方文件提到,建立 MutableStateFlow(initialValue) 其實就等同於對 SharedFlow 做了下面這些設定:

val shared = MutableSharedFlow(
  replay = 1,
  onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior

StateFlow 一開始就會 emit 預設值,並且設定為 replay = 1,所以 StateFlow 中永遠都存放著一筆資料。

因為 BufferOverflow.DROP_OLDEST 的特性,生產者想要修改資料的話,可以直接做 setValue,而不用在 Coroutine Scope 中做異步的 emit,因為結果而言會是一樣的。

要特別注意的是,StateFlow 額外做了 distinctUntilChanged() 。這意味著如果要更新資料,要注意不能直接修改舊有資料的實體,而是要創建一個新的實體,以避免被忽略掉。

StateFlow 如其名,適合放置「唯一而最新的」的資料。不像 SharedFlow 的資料介於有和沒有之間,StateFlow 一定會給予一個最新的狀態,有可能是 Loading、Success 或 Error,通常可以用 Sealed Class 把所有狀態給包裝起來。


Click here to share this article with your friends on X if you liked it.