Mutable shared flow
Kotlin #kotlin #androidIn kotlin, a flow
represents an asynchronous data stream that sequentially emits values and completes normally or with an exception. doc.
Flows are cold streams
cold stream
means that a flow will not
start producing
values until a subscriber starts collecting
it. Also, a flow will be
started for every subscriber.
In the following example, intFlow
will not start producing values until subscriber1
or subscriber2
starts collecting it and for every subscriber (1
& 2
), flow is started and all three values are emitted.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun main() = runBlocking {
val intFlow = flow {
repeat(3) {
println("emitting: $it")
emit(it)
}
}
launch {
println("subscriber1 will start collecting now")
intFlow.collectLatest {
println("subscriber1: $it")
}
}.join()
launch {
println("subscriber2 will start collecting now")
intFlow.collectLatest {
println("subscriber2: $it")
}
}.join()
}
//sampleEnd
SharedFlow
SharedFlow is a hot
flow which broadcasts the emitted values among all its subscribers. A shared flow is called hot because its active instance exists independently
of the presence of collectors.
A shared flow will never complete, however, subscribers to shared flow can be cancelled i.e. when scope in which flow is collected is cancelled.
A mutable shared flow can be created using MutableSharedFlow(...)
construction function which allows emitting values to share flow. See MutableSharedFlow for more details.
MutableSharedFlow
constructor takes three arguments. We’ll explore them one by one
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)
Replay
replay
defines the number of values to be cached and replayed to new
subscribers.
replayCache()
and resetReplayCache()
methods are available on mutable shared flow to get and reset replayCache
respectively.
Once resetReplayCache()
is called, new subscribers will only receive values emitted after this method was called.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun main() = runBlocking {
val stringFlow = MutableSharedFlow<String>(
replay = 2
)
// values emitted before flow is collected
stringFlow.tryEmit("a")
stringFlow.tryEmit("b")
stringFlow.tryEmit("c")
delay(100)
// print current replay cache
println("replay cache: ${stringFlow.replayCache}")
val job = launch {
stringFlow.collectLatest {
// only `b` and `c` will be collected. `a` is lost b/c
// replay cache size is 2
println("collected value $it")
}
}
delay(100)
// cancel the job b/c stringFlow will never complete and this
// will keep runBlocking running forever
job.cancel()
}
//sampleEnd
extraBufferCapacity and onBufferOverflow
extraBufferCapacity
provides extra space in addition to replayCache
for the allowing slow subscribers to get values from the buffer without suspending emitters.
With extraBufferCapacity
, buffer overflow strategy can be configured which can have one of the following values
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,
/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST,
/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
-
Buffer overflow condition can happen
only
when there is at leastone
subscriber that isnot
ready to accept the new value. In the absence of subscribers only the most recent replay values are stored and the buffer overflow behavior isnever
triggered and hasno
effect. -
When
replayCache
andextraBufferCapacity
is set to0
then buffer overflow strategy can not be set apart fromSUSPEND
, setting it toDROP_OLDEST
orDROP_LATEST
will throwIllegalArgumentException
Exception in thread "main" java.lang.IllegalArgumentException: replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy DROP_OLDEST
at kotlinx.coroutines.flow.SharedFlowKt.MutableSharedFlow(SharedFlow.kt:271)
at org.jetbrains.kotlin.idea.scratch.generated.ScratchFileRunnerGenerated$ScratchFileRunnerGenerated$generated_get_instance_res0$1$1.invokeSuspend(tmp.kt:14)
Example
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
//sampleStart
fun main() = runBlocking {
// total buffer capacity is 3. replay cache + extraBufferCapacity
val stringFlow = MutableSharedFlow<String>(
replay = 1,
extraBufferCapacity = 2,
// older value from buffer is dropped on overflow
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val job = launch {
stringFlow.onEach {
println("collected value $it")
}.collectLatest {
// wait for 100ms, this will keep values in buffer
delay(100)
}
}
delay(100)
// emit 3 values, `a`, `b` & `c`
stringFlow.tryEmit("a")
stringFlow.tryEmit("b")
stringFlow.tryEmit("c")
// at this point buffer is full. sending value `d` will make
// flow to drop oldest-value (`a`). Remember, this depends on overflow
// strategy.
stringFlow.tryEmit("d")
delay(400)
// cancel the job b/c stringFlow will never complete and this
// will keep runBlocking running forever
job.cancel()
}
//sampleEnd
emit(…) and tryEmit(…)
Both emit
and tryEmit
emit values to shared flow.
-
emit
is asuspend
function. When a value is emitted usingemit
, it will suspendonly
, when buffer overflow strategy isBufferOverflow.SUSPEND
and there are subscribers to collect the value. -
If there are no subscribers and replay cache is not configured then this method will simply
drop
the values, otherwise replay cache will be used to store values in drop-oldest fashion. -
tryEmit
tries to emit values from shared flow withoutsuspension
and returnstrue/false
if value was emitted successfully or not. When a flow is configured withBufferOverflow.SUSPEND
and there are subscribers to collect the values,tryEmit
will returnfalse
if a plain call toemit
would suspend and the value is dropped.
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
//sampleStart
fun main() = runBlocking {
// total buffer capacity is 3. replay cache + extraBufferCapacity
val stringFlow = MutableSharedFlow<String>(
replay = 1,
extraBufferCapacity = 2,
// suspend on overflow
onBufferOverflow = BufferOverflow.SUSPEND
)
val job = launch {
stringFlow.onEach {
println("collected value $it")
}.collectLatest {
// wait for 100ms, this will keep values in buffer
delay(100)
}
}
delay(100)
// emit 3 values, `a`, `b` & `c`
println("emit ${stringFlow.tryEmit("a")}")
println("emit ${stringFlow.tryEmit("b")}")
println("emit ${stringFlow.tryEmit("c")}")
// at this point buffer is full, with BufferOverflow.SUSPEND
// tryEmit will return false and value will be dropped
println("emit ${stringFlow.tryEmit("d")} -- emit was false")
delay(400)
// cancel the job b/c stringFlow will never complete and this
// will keep runBlocking running forever
job.cancel()
}
//sampleEnd