본문 바로가기
Kotlin

[Kotlin] Kotlin 공식 문서 번역 - 코루틴 비동기 Flow (Coroutines Asynchronous Flow)

by 노력남자 2023. 10. 1.
반응형

코루틴 비동기 Flow (Coroutines Asynchronous Flow)

 

서스펜딩 함수는 비동기로 단일 값을 반환하지만, 어떻게 여러 비동기로 계산된 값을 반환할 수 있을까요? Kotlin Flows가 이럴 때 사용됩니다.


여러 값을 표현하기


여러 값을 Kotlin에서는 컬렉션을 사용하여 나타낼 수 있습니다. 예를 들어, 세 개의 숫자를 포함하는 List를 반환하는 간단한 함수를 만들고 forEach를 사용하여 모든 값을 출력할 수 있습니다.

fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
    simple().forEach { value -> println(value) }
}



이 코드는 다음과 같이 출력됩니다:

1
2
3


시퀀스


만약 CPU를 소모하는 블로킹 코드로 숫자를 계산한다면 (각 계산이 100ms 소요됨), 시퀀스를 사용하여 숫자를 나타낼 수 있습니다.

fun simple(): Sequence<Int> = sequence { // 시퀀스 빌더
    for (i in 1..3) {
        Thread.sleep(100) // 계산 중이라고 가정합니다.
        yield(i) // 다음 값을 생성합니다.
    }
}

fun main() {
    simple().forEach { value -> println(value) }
}


이 코드는 동일한 숫자를 출력하지만, 각각을 출력하기 전에 100ms를 기다립니다.


서스펜딩 함수


그러나 이 계산은 코드를 실행하는 주 스레드를 차단합니다. 이러한 값을 비동기 코드에서 계산할 때 simple 함수에 suspend 수식어를 지정하여 차단 없이 작업을 수행하고 결과를 리스트로 반환할 수 있습니다.

suspend fun simple(): List<Int> {
    delay(1000) // 비동기로 무언가를 하는 것처럼 가정합니다.
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) }
}


이 코드는 1초를 기다린 후에 숫자를 출력합니다.


Flows


List<Int> 결과 유형을 사용하면 모든 값을 한꺼번에 반환할 수 있습니다. 비동기로 계산되는 값의 스트림을 나타내려면 Flow<Int> 유형을 사용할 수 있으며 시퀀스<Int> 유형을 동기적으로 계산된 값에 사용하는 것과 마찬가지로 사용할 수 있습니다.

fun simple(): Flow<Int> = flow { // 플로우 빌더
    for (i in 1..3) {
        delay(100) // 유용한 작업을 수행 중이라고 가정합니다.
        emit(i) // 다음 값을 발생시킵니다.
    }
}

fun main() = runBlocking<Unit> {
    // 메인 스레드가 차단되지 않는지 확인하기 위해 별도의 코루틴에서 "I'm not blocked"를 100ms마다 출력합니다.
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // 플로우를 수집합니다.
    simple().collect { value -> println(value) }
}


이 코드는 주 스레드를 차단하지 않고 각 숫자를 출력하기 전에 100ms를 기다립니다. 이것은 주 스레드에서 실행 중인 별도의 코루틴에서 매 100ms마다 "I'm not blocked"를 출력함으로써 확인됩니다.

 

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3


Flow 예제와 이전 예제의 차이점을 주목하세요:

  • Flow 유형의 빌더 함수는 flow로 호출됩니다.
  • flow { ... } 빌더 블록 내부의 코드는 일시 중단될 수 있습니다.
  • simple 함수에 더 이상 suspend 수식어가 표시되지 않습니다.
  • 값을 발생시키기 위해 emit 함수를 사용하여 플로우에서 값을 발생시킵니다.
  • 값을 수집하기 위해 collect 함수를 사용하여 플로우에서 값을 수집합니다.

 

simple의 flow { ... } 본문에서 delay를 Thread.sleep로 대체하면 주 스레드가 차단됨을 볼 수 있습니다.

 

Flows는 콜드다.

 

플로우는 시퀀스와 유사한 콜드 스트림입니다. 즉, 플로우 빌더 내부의 코드는 플로우가 수집될 때까지 실행되지 않습니다. 이것은 다음 예제에서 명확하게 나타납니다.

fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}


이것은 다음과 같이 출력됩니다:

 

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3


이것이 simple 함수(플로우를 반환하는 함수)가 suspend 수식어로 표시되지 않는 주요 이유 중 하나입니다. simple() 호출 자체는 빠르게 반환되며 아무 것도 기다리지 않습니다. 플로우는 수집될 때마다 처음부터 시작되며 이것이 collect를 다시 호출할 때마다 "Flow started"가 표시되는 이유입니다.


플로우 취소 기본 사항


플로우는 일반적인 코루틴의 협력 취소를 따릅니다. 일반적으로 플로우 수집은 플로우가 취소 가능한 일시 중단 함수(예: delay)에서 중단될 때 취소될 수 있습니다. 다음 예제에서는 플로우가 withTimeoutOrNull 블록에서 실행될 때 타임아웃으로 인해 취소되고 코드가 실행을 멈추는 방법을 보여줍니다.

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // 250ms 후에 타임아웃
        simple().collect { value -> println(value) } 
    }
    println("Done")
}


플로우 내부에서 simple 함수에서는 두 개의 숫자만 발생되므로 다음 출력이 생성됩니다:

 

Emitting 1
1
Emitting 2
2
Done


플로우 빌더


이전 예제의 flow { ... } 빌더는 가장 기본적인 것입니다. 다른 빌더를 사용하여 플로우를 선언할 수도 있습니다.

  • flowOf 빌더는 일정한 값을 발생시키는 플로우를 정의합니다.
  • 다양한 컬렉션 및 시퀀스는 .asFlow() 확장 함수를 사용하여 플로우로 변환할 수 있습니다.

 

예를 들어, 플로우에서 1부터 3까지의 숫자를 출력하는 코드를 다음과 같이 다시 작성할 수 있습니다:

// 정수 범위를 플로우로 변환
(1..3).asFlow().collect { value -> println(value) }

 

중간 플로우 연산자


플로우는 컬렉션 및 시퀀스를 변환하는 방법과 동일한 방식으로 연산자를 사용하여 변환할 수 있습니다. 중간 연산자는 상류 플로우에 적용되고 하류 플로우를 반환합니다. 이러한 연산자도 플로우처럼 콜드(cold)이며, 이러한 연산자 호출은 자체적으로 중단 함수가 아닙니다. 그것은 빠르게 작동하여 새로운 변환된 플로우의 정의를 반환합니다.

기본 연산자들은 map 및 filter와 같이 익숙한 이름을 가지고 있습니다. 이러한 연산자들은 시퀀스와 달리 이러한 연산자 내부의 코드 블록에서 중단 함수를 호출할 수 있다는 중요한 차이가 있습니다.

예를 들어, 들어오는 요청의 플로우를 map 연산자를 사용하여 결과로 매핑할 수 있습니다. 심지어 요청을 수행하는 것이 중단 함수로 구현된 장기 실행 작업인 경우에도 말이죠.

suspend fun performRequest(request: Int): String {
    delay(1000) // 장기 실행되는 비동기 작업을 모방
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // 요청의 플로우
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}


이 코드는 다음과 같이 출력됩니다. 각 줄은 이전 줄에서 1초 후에 나타납니다.

response 1
response 2
response 3


Transform 연산자


플로우 변환 연산자 중에서 가장 일반적인 것은 transform이라고 합니다. 이를 사용하여 map 및 filter와 같은 간단한 변환을 흉내 내는 것뿐만 아니라 더 복잡한 변환을 구현할 수 있습니다. transform 연산자를 사용하여 임의의 값을 임의의 횟수로 발생시킬 수 있습니다.

예를 들어, transform을 사용하여 장기 실행 비동기 요청을 수행하기 전에 문자열을 발생시키고 그 후에 응답을 발생시킬 수 있습니다.

(1..3).asFlow() // 요청의 플로우
    .transform { request ->
        emit("Request $request 시작")
        emit(performRequest(request))
    }
    .collect { response -> println(response) }


이 코드의 출력은 다음과 같습니다.

 

Request 1 시작
response 1
Request 2 시작
response 2
Request 3 시작
response 3


크기 제한 연산자


take와 같은 크기 제한 중간 연산자는 해당 제한에 도달하면 플로우의 실행을 취소합니다. 코루틴에서 취소는 항상 예외를 throw하여 취소됩니다. 이로써 모든 리소스 관리 함수(try { ... } finally { ... } 블록과 같은)가 취소될 경우에도 정상적으로 작동합니다.

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("이 줄은 실행되지 않습니다.")
        emit(3)    
    } finally {
        println("numbers 내부의 Finally")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // 처음 두 개만 가져오기
        .collect { value -> println(value) }
}


이 코드의 출력은 명확하게 numbers() 함수 내부의 flow { ... } 본문의 실행이 두 번째 숫자를 발생시킨 후에 중단되었음을 보여줍니다.

1
2
Finally in numbers


종단 플로우 연산자


플로우의 종단 연산자는 플로우의 수집을 시작하는 중단 함수입니다. 가장 기본적인 연산자는 collect이지만 더 쉽게 만들 수 있는 다른 종단 연산자도 있습니다.

 

  • toList 및 toSet과 같은 다양한 컬렉션으로의 변환
  • 첫 번째 값을 가져오고 플로우가 단일 값을 발생하도록 보장하는 연산자
  • reduce 및 fold를 사용하여 플로우를 값으로 축소


예를 들어:

val sum = (1..5).asFlow()
    .map { it * it } // 1에서 5까지의 숫자의 제곱
    .reduce { a, b -> a + b } // 합산 (종단 연산자)
println(sum)


단일 숫자를 출력합니다.

55

 

플로우는 연속적입니다.


각 플로우의 개별 컬렉션은 특별한 여러 플로우를 다루는 연산자를 사용하지 않는 한 순차적으로 수행됩니다. 컬렉션은 일반적으로 터미널 연산자를 호출하는 코루틴에서 직접 작동합니다. 기본적으로 새로운 코루틴은 시작되지 않습니다. 각 발행된 값은 상향 및 하향 스트림의 모든 중간 연산자에 의해 처리되고, 그런 다음 터미널 연산자로 전달됩니다.

다음 예제를 참조하세요. 이 예제는 짝수 정수를 필터링하고 문자열로 매핑합니다.

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }


플로우 컨텍스트


플로우의 컬렉션은 항상 호출하는 코루틴의 컨텍스트에서 수행됩니다. 예를 들어, 간단한 플로우가 있다면, 해당 플로우의 구현 세부 사항과 관계없이 다음 코드는 이 코드의 작성자가 지정한 컨텍스트에서 실행됩니다.

withContext(context) {
    simple().collect { value ->
        println(value) // 지정된 컨텍스트에서 실행
    }
}


이 플로우의 컨텍스트 보존 특성이라고 하는 속성입니다.

그러므로 기본적으로 플로우 { ... } 빌더 내의 코드는 해당 플로우의 수집기의 제공한 컨텍스트에서 실행됩니다. 예를 들어, 세 개의 숫자를 출력하고 호출한 스레드를 출력하는 간단한 함수의 구현을 고려해보세요.

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}


이 코드를 실행하면 다음이 생성됩니다.

[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3


따라서 simple().collect는 메인 스레드에서 호출되므로 simple 플로우의 본문도 메인 스레드에서 호출됩니다. 이것은 실행 컨텍스트를 신경쓰지 않고 호출자를 차단하지 않는 빠르게 실행되거나 비동기 코드에 대한 완벽한 기본값입니다.


withContext를 사용할 때의 일반적인 함정


그러나 CPU를 많이 사용하는 장기 실행 코드는 Dispatchers.Default 컨텍스트에서 실행되어야 할 수 있으며 UI 업데이트 코드는 Dispatchers.Main 컨텍스트에서 실행되어야 할 수 있습니다. 일반적으로 코틀린 코루틴을 사용하는 코드에서 컨텍스트를 변경하려면 withContext를 사용하지만 flow { ... } 빌더 내의 코드는 컨텍스트 보존 속성을 준수해야 하며 다른 컨텍스트에서 발행할 수 없습니다.

다음 코드를 실행하려고 시도해보세요.

fun simple(): Flow<Int> = flow {
    // 플로우 빌더에서 CPU 집약적인 코드의 컨텍스트를 변경하는 잘못된 방법
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // CPU 집약적인 작업을 수행 중인 것처럼 가정
            emit(i) // 다음 값을 발행
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}


이 코드는 다음 예외를 발생시킵니다.

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...


flowOn 연산자


예외는 플로우 발행의 컨텍스트를 변경해야 할 때 사용해야 하는 flowOn 함수를 가리킵니다. 올바른 방법은 다음과 같이 플로우 빌더 내에서 CPU를 많이 사용하는 코드의 컨텍스트를 변경한 예제입니다. 이 예제는 어떻게 모두 작동하는지 보여주기 위해 해당 스레드의 이름을 출력합니다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // CPU 집약적인 작업을 수행 중인 것처럼 가정
        log("Emitting $i")
        emit(i) // 다음 값을 발행
    }
}.flowOn(Dispatchers.Default) // 플로우 빌더 내에서 CPU 집약적인 코드의 컨텍스트를 변경하는 올바른 방법

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}


flow { ... }가 백그라운드 스레드에서 작동하는 것을 확인하고 수집이 메인 스레드에서 이루어진다는 것을 주목하세요.

여기에서 또 다른 관찰할 점은 flowOn 연산자가 플로우의 기본 순차적 특성을 변경했다는 것입니다. 이제 수집은 하나의 코루틴("coroutine#1")에서 발생하고 발행은 다른 코루틴("coroutine#2")에서 다른 스레드에서 수집 코루틴과 동시에 실행되는 다른 코루틴에서 발생합니다. flowOn 연산자는 컨텍스트의 CoroutineDispatcher를 변경해야 할 때 상향 플로우에 대한 새로운 코루틴을 생성합니다.

 

버퍼링


서로 다른 코루틴에서 플로우의 다른 부분을 실행하는 것은 특히 긴 비동기 작업이 포함된 경우에는 플로우를 수집하는 데 걸리는 전반적인 시간 측면에서 도움이 될 수 있습니다. 예를 들어, 단순한 플로우가 원소를 생성하는 데 100ms가 걸리는 느린 경우와 수집기도 느리게 원소를 처리하는 데 300ms가 걸리는 경우를 생각해보겠습니다. 세 개의 숫자를 포함하는 이러한 플로우를 수집하는 데 얼마나 오래 걸릴지 살펴보겠습니다:

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 100ms 동안 비동기로 대기 중인 것으로 가정
        emit(i) // 다음 값을 발생시킴
    }
}

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple().collect { value ->
            delay(300) // 300ms 동안 처리 중인 것으로 가정
            println(value)
        }
    }
    println("Collected in $time ms")
}


이것은 전체 수집이 약 1200ms(세 개의 숫자, 각각 400ms) 정도 걸린다는 것을 생성합니다:

 

1
2
3
Collected in 1220 ms


플로우에서 버퍼 연산자를 사용하여 간단한 플로우의 발생 코드를 수집 코드와 병렬로 실행할 수 있습니다. 이렇게하면 순차적으로 실행하는 대신 발생 코드를 동시에 실행할 수 있습니다.

val time = measureTimeMillis {
    simple()
        .buffer() // 발생을 버퍼에 넣고 기다리지 않음
        .collect { value ->
            delay(300) // 300ms 동안 처리 중인 것으로 가정
            println(value)
        }
}
println("Collected in $time ms")


이렇게하면 효과적으로 처리 파이프라인을 만들어 첫 번째 숫자를 기다릴 때만 100ms만 기다리고 각 숫자를 처리하는 데 300ms만 소요됩니다. 이렇게하면 약 1000ms가 걸립니다:

1
2
3
Collected in 1071 ms


주의할 점은 flowOn 연산자가 CoroutineDispatcher를 변경해야 할 때 동일한 버퍼링 메커니즘을 사용한다는 것입니다. 하지만 여기서는 명시적으로 실행 컨텍스트를 변경하지 않고도 버퍼링을 요청합니다.


합침


플로우가 작업의 부분적인 결과나 작업 상태 업데이트를 나타낼 때 모든 값을 처리하는 것이 아니라 가장 최근 값을 처리하는 것이 필요할 수 있습니다. 이 경우 conflate 연산자를 사용하여 수집기가 처리하기에 너무 느릴 때 중간 값을 건너뛸 수 있습니다. 이전 예제를 기반으로 살펴보겠습니다:

val time = measureTimeMillis {
    simple()
        .conflate() // 값을 합치고 모든 값을 처리하지 않음
        .collect { value ->
            delay(300) // 300ms 동안 처리 중인 것으로 가정
            println(value)
        }
}
println("Collected in $time ms")


첫 번째 숫자가 아직 처리 중일 때 두 번째와 세 번째가 이미 생성되었기 때문에 두 번째 값은 결합되고 가장 최근 값인 세 번째 값만 수집기에 전달됩니다:

1
3
Collected in 758 ms


최신 값을 처리하는 방법


Conflate는 발생 및 수집이 모두 느릴 때 처리 속도를 높이는 한 가지 방법입니다. 이것은 발생한 값을 삭제하여 수집하는 것입니다. 다른 방법은 느린 수집기를 취소하고 새 값이 발생할 때마다 다시 시작하는 것입니다. 이렇게하려면 xxxLatest 연산자의 패밀리를 사용할 수 있습니다. 이 연산자들은 블록에서 새 값이 발생할 때마다 블록 내의 코드를 취소하고 재시작하는 동일한 기본 논리를 수행합니다. 이전 예제에서 conflate를 collectLatest로 변경해 보겠습니다:

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // 최신 값에서 취소 및 다시 시작
            println("Collecting $value")
            delay(300) // 300ms 동안 처리 중인 것으로 가정
            println("Done $value")
        }
}
println("Collected in $time ms")


collectLatest의 본문은 300ms가 걸리지만 새 값은 100ms마다 발생하기 때문에 블록이 모든 값을 처리하되 마지막 값에 대해서만 완료됨을 볼 수 있습니다:

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

 

여러 개의 플로우를 조합하는 방법


여러 개의 플로우를 조합하는 다양한 방법이 있습니다.


Zip


일련의 숫자와 문자열을 조합하는 예제를 살펴보겠습니다. Kotlin 표준 라이브러리의 Sequence.zip 확장 함수처럼 플로우에는 두 개의 플로우의 해당 값들을 결합하는 zip 연산자가 있습니다.

val nums = (1..3).asFlow() // 숫자 1..3
val strs = flowOf("one", "two", "three") // 문자열
nums.zip(strs) { a, b -> "$a -> $b" } // 하나의 문자열로 조합
    .collect { println(it) } // 수집하고 출력


이 예제는 다음과 같이 출력됩니다:

1 -> one
2 -> two
3 -> three


Combine


플로우가 변수나 작업의 가장 최근 값을 나타내는 경우 (conflation에 관한 관련 섹션도 참고), 해당 플로우의 가장 최근 값에 의존하는 계산을 수행하고 상위 스트림 플로우 중 하나가 값을 내보낼 때마다 다시 계산해야 할 수 있습니다. 이에 해당하는 연산자 패밀리는 combine입니다.

예를 들어, 이전 예제의 숫자가 300ms마다 업데이트되고 문자열은 400ms마다 업데이트되는 경우 zip 연산자를 사용하여 조합하면 여전히 같은 결과가 생성됩니다. 그러나 결과가 400ms마다 출력됩니다.

이 예제에서는 각 요소를 지연시키기 위해 중간 연산자로 onEach를 사용하여 샘플 플로우를 더 선언적이고 짧게 만들었습니다.

val nums = (1..3).asFlow().onEach { delay(300) } // 300ms마다 숫자 1..3
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 400ms마다 문자열
val startTime = System.currentTimeMillis() // 시작 시간 기억
nums.zip(strs) { a, b -> "$a -> $b" } // "zip"을 사용하여 하나의 문자열을 조합
    .collect { value -> // 수집하고 출력
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
    }


그러나 여기서 zip 대신 combine 연산자를 사용하면 다른 출력이 나옵니다. 여기서는 nums 또는 strs 플로우가 값을 내보낼 때마다 한 줄이 출력됩니다.

1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

 

플로우 펼치기


플로우는 비동기로 수신된 값의 시퀀스를 나타내므로 각 값이 다른 시퀀스의 값을 요청하는 상황이 생기기 쉽습니다. 예를 들어, 다음과 같은 함수가 있다고 가정해보겠습니다. 이 함수는 두 개의 문자열을 500ms 간격으로 발생시키는 플로우를 반환합니다.

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // 500ms 대기
    emit("$i: Second")
}


이제 세 개의 정수로 이루어진 플로우가 있고 다음과 같이 각각에 대해 requestFlow를 호출한다고 가정해봅시다.

(1..3).asFlow().map { requestFlow(it) }


이렇게 하면 단일 플로우로 펼쳐서 추가 처리를 위해 사용해야 하는 플로우의 플로우(Flow<Flow<String>>)가 생성됩니다. 이를 플레이트(Flatten)하기 위한 flatten 및 flatMap 연산자가 있습니다. 그러나 플로우의 비동기적인 성격 때문에 다른 플래팅 모드를 요청하며 따라서 플로우에는 플래팅 연산자 패밀리가 존재합니다.


flatMapConcat


플로우의 플로우를 연결하는 연산은 flatMapConcat 및 flattenConcat 연산자로 제공됩니다. 이들은 해당 시퀀스 연산의 가장 직접적인 대응물입니다. 다음 예제에서 볼 수 있듯이 내부 플로우가 완료될 때까지 다음 플로우를 수집하기를 기다립니다.

val startTime = System.currentTimeMillis() // 시작 시간 기억
(1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 발생
    .flatMapConcat { requestFlow(it) }
    .collect { value -> // 수집하고 출력
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
    }


flatMapConcat의 순차적인 특성은 출력에서 명확하게 확인할 수 있습니다.

1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start


flatMapMerge


다른 플래팅 연산은 모든 입력 플로우를 동시에 수집하고 그 값을 단일 플로우로 병합하여 값이 가능한 빨리 방출되도록 하는 것입니다. 이것은 flatMapMerge 및 flattenMerge 연산자에 의해 구현됩니다. 이들은 모두 동시에 수집되는 동안 동시 플로우의 수를 제한하는 선택적인 concurrency 매개변수를 수용합니다(기본적으로 DEFAULT_CONCURRENCY와 같음).

val startTime = System.currentTimeMillis() // 시작 시간 기억
(1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 발생
    .flatMapMerge { requestFlow(it) }
    .collect { value -> // 수집하고 출력
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
    }


flatMapMerge의 동시성 특성은 명확합니다.

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start


참고로 flatMapMerge는 코드 블록({ requestFlow(it) }이 예제에서)을 순차적으로 호출하지만 결과 플로우를 동시에 수집합니다. 이것은 먼저 순차적인 map { requestFlow(it) }를 수행하고 그 결과에 대해 flattenMerge를 호출하는 것과 동일합니다.


flatMapLatest


"가장 최근 값 처리" 섹션에서 설명한 collectLatest 연산자와 유사하게 이전 플로우의 수집이 새 플로우가 발생하자마자 취소되는 "가장 최근" 플래팅 모드가 있습니다. 이것은 flatMapLatest 연산자로 구현됩니다.

val startTime = System.currentTimeMillis() // 시작 시간 기억
(1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 발생
    .flatMapLatest { requestFlow(it) }
    .collect { value -> // 수집하고 출력
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
    }


이 예제에서 출력은 flatMapLatest가 어떻게 작동하는지 잘 보여줍니다.

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start


flatMapLatest는 새 값이 수신될 때마다 코드 블록({ requestFlow(it) }이 예제에서)의 모든 코드를 취소합니다. 이 특정 예제에서는 requestFlow 호출 자체가 빠르고 중단되지 않으며 취소될 수 없기 때문에 차이가 없습니다. 그러나 requestFlow에서 delay와 같은 중단 함수를 사용하는 경우 출력에서 차이가 나타날 것입니다.

 

플로우 예외 처리


플로우 수집은 발생자 또는 연산자 내부의 코드에서 예외가 발생할 때 예외로 완료될 수 있습니다. 이러한 예외를 처리하는 여러 가지 방법이 있습니다.


수집기에서 try-catch


수집기는 Kotlin의 try/catch 블록을 사용하여 예외를 처리할 수 있습니다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // 다음 값을 발생시킴
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}


이 코드는 수집의 터미널 연산자에서 예외를 성공적으로 잡으며, 그 뒤에는 더 이상 값이 발생하지 않는 것을 볼 수 있습니다:

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2


모든 예외를 잡기


이전 예제에서는 사실 발생자나 중간 또는 터미널 연산자에서 발생하는 예외를 모두 잡습니다. 예를 들어, 발생한 값을 문자열로 매핑하지만 해당 코드에서 예외가 발생하도록 코드를 변경하는 경우:

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // 다음 값을 발생시킴
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}


이 예외도 잡히고 수집이 중지됩니다:

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

 

예외 투명성


그러나 발생자의 코드가 예외 처리 동작을 캡슐화하는 방법은 무엇일까요?

플로우는 예외에 대해 투명해야 하며, 플로우 { ... } 빌더 내부에서 try/catch 블록 안에서 값을 발생시키는 것은 예외 투명성을 위반하는 것입니다. 이렇게 하면 수집기가 예외를 던지면 이전 예제와 같이 try/catch를 사용하여 항상 예외를 잡을 수 있음이 보장됩니다.

발생자는 예외 투명성을 보존하고 예외 처리를 캡슐화하는 catch 연산자를 사용할 수 있습니다. catch 연산자의 본문은 예외를 분석하고 예외가 어떤 예외인지에 따라 다른 방식으로 반응할 수 있습니다.

  • 예외를 throw를 사용하여 다시 던질 수 있습니다.
  • 예외를 본문의 emit을 사용하여 값으로 변환할 수 있습니다.
  • 예외를 무시하거나 기록하거나 다른 코드에서 처리할 수 있습니다.


예를 들어, 예외를 잡을 때 텍스트를 발생시켜 보겠습니다:

simple()
    .catch { e -> emit("Caught $e") } // 예외 발생 시 발생
    .collect { value -> println(value) }


이 예제의 출력은 같지만 더 이상 코드 주위에 try/catch가 없습니다.


투명한 catch


catch 중간 연산자는 예외 투명성을 준수하여 catch 아래의 예외(즉, catch 위의 모든 연산자에서 발생한 예외)만 잡습니다. collect { ... } 내의 블록이 예외를 throw하면 catch 밖으로 빠져나갑니다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // 하류 예외를 잡지 않음
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}


catch 연산자가 있더라도 "Caught ..." 메시지가 출력되지 않습니다:

Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at ...


선언적으로 catch하기


catch 연산자의 선언적 특성을 onEach로 결합하여 모든 예외를 처리하려는 의도와 결합할 수 있습니다. 이 경우 수집 플로우는 매개변수 없이 collect() 호출에 의해 트리거되어야 합니다.

simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()


이제 "Caught ..." 메시지가 출력되며 try/catch 블록을 명시적으로 사용하지 않고 모든 예외를 잡을 수 있습니다:

Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

 

플로우 완료


플로우 수집이 완료될 때(일반적으로 또는 예외적으로) 작업을 실행해야 할 수 있습니다. 이미 알아차렸겠지만 이를 수행하는 두 가지 방법이 있습니다: 명령적 또는 선언적입니다.


명령적인 finally 블록


try/catch 외에도 수집기는 수집 완료 시 작업을 실행하기 위해 finally 블록을 사용할 수 있습니다.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}


이 코드는 simple 플로우에서 생성된 세 개의 숫자를 출력한 다음 "Done" 문자열을 출력합니다:

1
2
3
Done


선언적 처리


선언적 접근 방식을 위해 플로우에는 플로우가 완전히 수집된 경우에 호출되는 onCompletion 중간 연산자가 있습니다.

이전 예제를 onCompletion 연산자를 사용하여 다시 작성할 수 있으며 동일한 출력을 생성합니다:

simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }


onCompletion의 주요 이점은 람다의 nullable Throwable 매개변수로 플로우 수집이 정상적으로 완료되었는지 예외적으로 완료되었는지 여부를 확인할 수 있다는 것입니다. 다음 예제에서 simple 플로우는 숫자 1을 발생시킨 후 예외를 던집니다.

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}


예상했듯이 다음과 같이 출력됩니다:

1
Flow completed exceptionally
Caught exception


onCompletion 연산자는 예외를 처리하지 않으며 위의 예제 코드에서 볼 수 있듯이 예외는 여전히 하류로 흐릅니다. 이 예외는 추가적인 onCompletion 연산자에 전달되고 catch 연산자를 사용하여 처리할 수 있습니다.


성공적인 완료


catch 연산자와 다른 차이점은 onCompletion이 모든 예외를 볼 수 있으며, 상류 플로우가 성공적으로 완료(취소나 실패 없이)되면 null 예외를 받는다는 것입니다.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}


우리는 완료 원인이 null이 아니라는 것을 볼 수 있습니다. 왜냐하면 하류 예외로 인해 플로우가 중단되었기 때문입니다.

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2

 

명령적 대 선언적


이제 플로우를 수집하고 완료 및 예외를 처리하는 방법을 명령적 및 선언적 방식으로 알게 되었습니다. 자연스럽게, 어떤 접근 방식이 선호되며 그 이유는 무엇인지에 대한 질문이 생깁니다. 라이브러리로서, 특정한 접근 방식을 주장하지 않으며, 두 가지 옵션 모두 유효하며 자신의 기호와 코드 스타일에 따라 선택해야 한다고 믿습니다.


플로우 시작


플로우를 사용하여 어떤 소스에서 오는 비동기 이벤트를 나타내는 것은 쉽습니다. 이 경우에는 이벤트에 대한 반응을 등록하는 코드 조각과 계속해서 다른 작업을 수행하는 addEventListener 함수의 아날로그가 필요합니다. onEach 연산자는 이 역할을 할 수 있습니다. 그러나 onEach는 중간 연산자입니다. 또한 플로우를 수집하기 위한 종단 연산자가 필요하며, 그렇지 않으면 onEach를 호출해도 효과가 없습니다.

onEach 다음에 collect 종단 연산자를 사용하면 그 이후의 코드가 플로우가 수집될 때까지 대기합니다.

// 이벤트 플로우를 모방합니다.
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- 플로우 수집 대기
    println("Done")
}


위 코드에서 볼 수 있듯이 다음과 같이 출력됩니다.

Event: 1
Event: 2
Event: 3
Done


여기서 중요한 것은 플로우를 수집하기 위해 collect 종단 연산자를 사용하는 것입니다. 그러나 launchIn을 사용하면 플로우의 수집을 별도의 코루틴에서 시작할 수 있으므로 즉시 다음 코드의 실행이 계속됩니다.

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- 별도의 코루틴에서 플로우 시작
    println("Done")
}


이렇게 하면 다음과 같이 출력됩니다.

 

Done
Event: 1
Event: 2
Event: 3


launchIn에 필요한 매개변수는 플로우를 수집하는 코루틴을 시작하는 CoroutineScope를 지정해야 합니다. 위 예제에서 이 스코프는 runBlocking 코루틴 빌더에서 나옵니다. 따라서 플로우가 실행되는 동안이 runBlocking 스코프는 자식 코루틴의 완료를 기다리며 메인 함수의 반환을 막고 이 예제를 종료시킵니다.

실제 응용 프로그램에서 스코프는 제한된 수명을 가진 엔터티에서 나올 것입니다. 이 엔터티의 수명이 종료되면 해당 스코프가 취소되어 해당 플로우의 수집이 취소됩니다. 이렇게하면 onEach { ... }.launchIn(scope) 쌍은 addEventListener처럼 작동하지만 해당 removeEventListener 함수가 필요하지 않습니다. 취소 및 구조화된 동시성이 이 목적을 제공하기 때문입니다.

launchIn은 플로우 수집 코루틴만 취소하고 전체 스코프를 취소하지 않거나 이를 조인하기 위해 사용할 수 있는 Job을 반환합니다.


플로우 취소 확인


편의를 위해 플로우 빌더는 각 발행된 값을위한 추가적인 취소 확인을 수행합니다. 이는 플로우 { ... }에서 값을 발행하는 중요한 루프가 취소 가능한지 확인합니다.

fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}


값을 발행하는 중요한 루프를 취소 가능하게 만들기 위해 .onEach { currentCoroutineContext().ensureActive() }를 추가할 수 있지만 이를 수행하는 데 사용할 수있는 준비된-to-use cancellable 연산자가 제공됩니다.

 

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}


cancellable 연산자를 사용하면 1부터 3까지의 숫자만 수집됩니다.

 

1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365


플로우와 리액티브 스트림


리액티브 스트림이나 RxJava와 같은 리액티브 프레임워크에 익숙한 분들에게 플로우의 디자인은 매우 익숙할 것입니다.

사실, 그 설계는 Reactive Streams 및 여러 구현에서 영감을 받았습니다. 그러나 Flow의 주요 목표는 가능한 간단한 디자인을 가지고 있으며 Kotlin 및 중단에 친숙하며 구조적 동시성을 준수하는 것입니다. 이 목표를 달성하기 위해 리액티브 선구자들과 그들의 엄청난 작업 없이는 불가능했을 것입니다. 더 자세한 내용은 Reactive Streams 및 Kotlin Flows 기사에서 확인할 수 있습니다.

개념적으로는 다르지만 Flow는 리액티브 스트림이며 플로우를 리액티브 (스펙 및 TCK 호환) Publisher로 변환하고 그 반대로 변환하는 것이 가능합니다. 이러한 변환기는 kotlinx.coroutines에서 제공되며 해당 리액티브 모듈에서 찾을 수 있습니다 (Reactive Streams에는 kotlinx-coroutines-reactive, Project Reactor에는 kotlinx-coroutines-reactor, RxJava2/RxJava3에는 kotlinx-coroutines-rx2/kotlinx-coroutines-rx3가 있습니다). 통합 모듈에는 Flow로부터 및 Flow로부터의 변환, Reactor의 Context와 함께 사용되는 통합 및 다양한 리액티브 엔터티와 작업하는 중단에 친화적인 방법이 포함되어 있습니다.

 

원문

 

https://kotlinlang.org/docs/flow.html

반응형

댓글