Mutable shared flow

In kotlin, a flow represents an asynchronous data stream that sequentially emits values and completes normally or with an exception. doc.

Flows are cold streams

cold stream means that a flow will not start producing values until a subscriber starts collecting it. Also, a flow will be started for every subscriber.

In the following example, intFlow will not start producing values until subscriber1 or subscriber2 starts collecting it and for every subscriber (1 & 2), flow is started and all three values are emitted.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun main() = runBlocking {
    val intFlow = flow {
        repeat(3) {
            println("emitting: $it")
            emit(it)
        }
    }

    launch {
        println("subscriber1 will start collecting now")
        intFlow.collectLatest {
            println("subscriber1: $it")
        }
    }.join()

    launch {
        println("subscriber2 will start collecting now")
        intFlow.collectLatest {
            println("subscriber2: $it")
        }
    }.join()
}
//sampleEnd

SharedFlow

SharedFlow is a hot flow which broadcasts the emitted values among all its subscribers. A shared flow is called hot because its active instance exists independently of the presence of collectors.

A shared flow will never complete, however, subscribers to shared flow can be cancelled i.e. when scope in which flow is collected is cancelled.

A mutable shared flow can be created using MutableSharedFlow(...) construction function which allows emitting values to share flow. See MutableSharedFlow for more details.

MutableSharedFlow constructor takes three arguments. We’ll explore them one by one

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)

Replay

replay defines the number of values to be cached and replayed to new subscribers.

replayCache() and resetReplayCache() methods are available on mutable shared flow to get and reset replayCache respectively.

Once resetReplayCache() is called, new subscribers will only receive values emitted after this method was called.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun main() = runBlocking {
    val stringFlow = MutableSharedFlow<String>(
        replay = 2
    )

    // values emitted before flow is collected
    stringFlow.tryEmit("a")
    stringFlow.tryEmit("b")
    stringFlow.tryEmit("c")

    delay(100)

    // print current replay cache
    println("replay cache: ${stringFlow.replayCache}")
    
    val job = launch {
        stringFlow.collectLatest {
            // only `b` and `c` will be collected. `a` is lost b/c
            // replay cache size is 2
            println("collected value $it")
        }
    }

    delay(100)

    // cancel the job b/c stringFlow will never complete and this
    // will keep runBlocking running forever
    job.cancel()
}
//sampleEnd

extraBufferCapacity and onBufferOverflow

extraBufferCapacity provides extra space in addition to replayCache for the allowing slow subscribers to get values from the buffer without suspending emitters.

With extraBufferCapacity, buffer overflow strategy can be configured which can have one of the following values

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,

    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,

    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}
  • Buffer overflow condition can happen only when there is at least one subscriber that is not ready to accept the new value. In the absence of subscribers only the most recent replay values are stored and the buffer overflow behavior is never triggered and has no effect.

  • When replayCache and extraBufferCapacity is set to 0 then buffer overflow strategy can not be set apart from SUSPEND, setting it to DROP_OLDEST or DROP_LATEST will throw IllegalArgumentException

Exception in thread "main" java.lang.IllegalArgumentException: replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy DROP_OLDEST
	at kotlinx.coroutines.flow.SharedFlowKt.MutableSharedFlow(SharedFlow.kt:271)
	at org.jetbrains.kotlin.idea.scratch.generated.ScratchFileRunnerGenerated$ScratchFileRunnerGenerated$generated_get_instance_res0$1$1.invokeSuspend(tmp.kt:14)

Example

import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

//sampleStart
fun main() = runBlocking {

    // total buffer capacity is 3. replay cache + extraBufferCapacity
    val stringFlow = MutableSharedFlow<String>(
        replay = 1,
        extraBufferCapacity = 2,
        // older value from buffer is dropped on overflow
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )

    val job = launch {
        stringFlow.onEach {
            println("collected value $it")
        }.collectLatest {
            // wait for 100ms, this will keep values in buffer
            delay(100)
        }
    }
    delay(100)

    // emit 3 values, `a`, `b` & `c`
    stringFlow.tryEmit("a")
    stringFlow.tryEmit("b")
    stringFlow.tryEmit("c")

    // at this point buffer is full. sending value `d` will make
    // flow to drop oldest-value (`a`). Remember, this depends on overflow
    // strategy.
    stringFlow.tryEmit("d")


    delay(400)

    // cancel the job b/c stringFlow will never complete and this
    // will keep runBlocking running forever
    job.cancel()
}
//sampleEnd

emit(…) and tryEmit(…)

Both emit and tryEmit emit values to shared flow.

  • emit is a suspend function. When a value is emitted using emit, it will suspend only, when buffer overflow strategy is BufferOverflow.SUSPEND and there are subscribers to collect the value.

  • If there are no subscribers and replay cache is not configured then this method will simply drop the values, otherwise replay cache will be used to store values in drop-oldest fashion.

  • tryEmit tries to emit values from shared flow without suspension and returns true/false if value was emitted successfully or not. When a flow is configured with BufferOverflow.SUSPEND and there are subscribers to collect the values, tryEmit will return false if a plain call to emit would suspend and the value is dropped.

import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

//sampleStart
fun main() = runBlocking {

    // total buffer capacity is 3. replay cache + extraBufferCapacity
    val stringFlow = MutableSharedFlow<String>(
        replay = 1,
        extraBufferCapacity = 2,
        // suspend on overflow
        onBufferOverflow = BufferOverflow.SUSPEND
    )

    val job = launch {
        stringFlow.onEach {
            println("collected value $it")
        }.collectLatest {
            // wait for 100ms, this will keep values in buffer
            delay(100)
        }
    }
    delay(100)

    // emit 3 values, `a`, `b` & `c`
    println("emit ${stringFlow.tryEmit("a")}")
    println("emit ${stringFlow.tryEmit("b")}")
    println("emit ${stringFlow.tryEmit("c")}")

    // at this point buffer is full, with BufferOverflow.SUSPEND
    // tryEmit will return false and value will be dropped
    println("emit ${stringFlow.tryEmit("d")} -- emit was false")


    delay(400)

    // cancel the job b/c stringFlow will never complete and this
    // will keep runBlocking running forever
    job.cancel()
}
//sampleEnd

top