본문 바로가기
Kotlin

[Kotlin] Kotlin 공식 문서 번역 - 채널 (Channels)

by 노력남자 2023. 10. 1.
반응형

채널 (Channels)

 

채널 기초


채널(Channel)은 개념적으로 BlockingQueue와 매우 유사합니다. 그 중요한 차이점 중 하나는 블로킹 put 연산 대신 중단 send를 가지고 있다는 것이고, 블로킹 take 연산 대신 중단 receive를 가진다는 것입니다.

val channel = Channel<Int>()
launch {
    // 이것은 무거운 CPU 집약적인 계산 또는 비동기 로직 일 수 있으며, 우리는 단순히 다섯 개의 제곱 값을 보내겠습니다.
    for (x in 1..5) channel.send(x * x)
}
// 여기서 받은 다섯 개의 정수를 인쇄합니다.
repeat(5) { println(channel.receive()) }
println("Done!")


이 코드의 출력은 다음과 같습니다:

1
4
9
16
25
Done!


채널 닫기 및 채널에서 반복


큐와는 달리 채널은 더 이상 요소가 오지 않음을 나타내기 위해 닫을 수 있습니다. 수신 측에서 채널에서 요소를 수신하기 위해 일반적인 for 루프를 사용하는 것이 편리합니다.

개념적으로, close는 채널에 특별한 close 토큰을 보내는 것과 같습니다. 이 close 토큰을 수신하면 반복이 멈추므로 close 이전에 보낸 모든 요소가 수신되는 것을 보장합니다.

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // 더 이상 보내지 않습니다.
}
// 여기서 채널에서 값을 받는 `for` 루프를 사용하여 받은 값을 인쇄합니다 (채널이 닫힐 때까지)
for (y in channel) println(y)
println("Done!")


채널 프로듀서 빌드


코루틴이 요소의 시퀀스를 생성하는 패턴은 꽤 일반적입니다. 이것은 동시성 코드에서 자주 발견되는 생산자-소비자 패턴의 일부입니다. 이런 생산자를 채널을 매개변수로 받는 함수로 추상화할 수 있지만, 결과를 함수에서 반환해야 한다는 일반적인 감각에 반하는 것입니다.

생산자 측에서 이렇게 하기 쉽게 만들어주는 편리한 코루틴 빌더인 produce와 소비자 측의 for 루프를 대체하는 consumeEach라는 확장 함수가 있습니다.

val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")

 

파이프라인


파이프라인은 하나의 코루틴이 값을 생성하는 패턴으로, 이 값들은 무한한 스트림일 수 있습니다.

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // 1부터 시작하는 무한한 정수 스트림을 생성합니다.
}


또 다른 코루틴 또는 여러 코루틴은 그 스트림을 소비하고 일부 처리를 수행하며 다른 결과를 생성합니다. 아래 예제에서는 숫자를 제곱하는 것입니다.

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}


주 코드는 전체 파이프라인을 시작하고 연결합니다.

val numbers = produceNumbers() // 1부터 시작하는 정수를 생성합니다.
val squares = square(numbers) // 정수를 제곱합니다.
repeat(5) {
    println(squares.receive()) // 처음 다섯 개를 출력합니다.
}
println("Done!") // 완료
coroutineContext.cancelChildren() // 자식 코루틴을 취소합니다.


모든 코루틴을 생성하는 함수는 CoroutineScope의 확장 함수로 정의되어 있으므로 구조화된 동시성을 활용하여 응용 프로그램에서 미해결된 전역 코루틴이 없도록 할 수 있습니다.


파이프라인을 사용하여 소수 찾기


파이프라인을 사용하여 소수를 생성하는 예를 통해 파이프라인을 극단으로 이끌어보겠습니다. 먼저 무한한 숫자 시퀀스에서 시작합니다.

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // 시작하는 정수의 무한한 스트림
}


다음 파이프라인 단계는 주어진 소수로 나눌 수 있는 모든 숫자를 제거하여 숫자의 스트림을 필터링합니다.

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}


이제 파이프라인을 시작 숫자 2에서 시작하여 현재 채널에서 소수를 가져와서 각 소수마다 새 파이프라인 단계를 시작하는 방식으로 구성합니다.

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...


다음 예제에서는 처음 10개의 소수를 출력하며 전체 파이프라인을 주 스레드 컨텍스트에서 실행합니다. 모든 코루틴이 주 실행 블록의 범위 내에서 시작되므로 시작한 모든 코루틴을 명시적으로 추적할 필요가 없습니다. 처음 10개의 소수를 출력한 후에는 모든 자식 코루틴을 취소하기 위해 cancelChildren 확장 함수를 사용합니다.

var cur = numbersFrom(2)
repeat(10) {
    val prime = cur.receive()
    println(prime)
    cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // 모든 자식 코루틴을 취소하여 main을 끝내도록 합니다.


이 코드의 출력은 다음과 같습니다:

2
3
5
7
11
13
17
19
23
29


이것은 소수를 찾는 극도로 비실용적인 방법이지만 파이프라인의 개념을 보여주기 위한 것입니다. 실제로 파이프라인은 일부 다른 중단 호출(원

격 서비스로의 비동기 호출과 같은)을 포함하며, 이러한 파이프라인은 완전히 비동기적인 produce와 달리 임의의 중단을 허용하지 않는 sequence/iterator를 사용하여 구축할 수 없습니다.


팬-아웃


여러 코루틴이 동일한 채널에서 수신할 수 있으며 작업을 서로 분산할 수 있습니다. 먼저 초당 10개의 정수를 주기적으로 생성하는 프로듀서 코루틴을 만듭니다.

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // 1부터 시작
    while (true) {
        send(x++) // 다음 값을 생성
        delay(100) // 0.1초 대기
    }
}


그런 다음 여러 프로세서 코루틴을 가질 수 있습니다. 이 예제에서는 그들은 자신의 ID와 받은 숫자를 인쇄하는 것입니다.

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}


이제 다섯 개의 프로세서를 시작하고 거의 1초 동안 작업하도록합니다. 결과를 확인해 보겠습니다.

val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 프로듀서 코루틴을 취소하여 모두 종료합니다.


출력은 다음과 같이 됩니다. 특정 정수를 수신하는 프로세서 ID가 다를 수 있습니다.

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10


프로듀서 코루틴을 취소하면 해당 채널이 닫히고, 프로세서 코루틴이 채널을 처리하고 있는 반복이 종료됩니다.

또한 launchProcessor 코드에서 fan-out을 수행하기 위해 채널을 명시적으로 for 루프로 반복하는 것에 주의하십시오. consumeEach 대신 사용하는 이 for 루프 패턴은 여러 코루틴에서 안전하게 사용할 수 있습니다. 프로세서 코루틴 중 하나가 실패하면 다른 코루틴은 여전히 채널을 처리하고 있습니다. consumeEach를 사용하여 작성된 프로세서는 일반적이거나 비정상적인 완료 시 기본 채널을 소비(취소)하기 때문에 이 패턴을 사용하여 작성된 프로세서는 다른 코루틴에서 안전하게 사용할 수 있습니다.

 

팬-인


여러 개의 코루틴은 동일한 채널에 값을 보낼 수 있습니다. 예를 들어, 문자열 채널과 지정된 문자열을 지정된 지연으로 반복적으로 해당 채널로 보내는 중단 함수가 있다고 가정해 보겠습니다.

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}


이제 문자열을 보내는 몇 개의 코루틴을 시작하면 어떻게 되는지 살펴보겠습니다 (이 예에서는 주 스레드의 컨텍스트에서 메인 코루틴의 자식으로 시작됩니다).

val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 처음 여섯 개를 수신합니다.
    println(channel.receive())
}
coroutineContext.cancelChildren() // 모든 자식을 취소하여 main을 끝내도록 합니다.


출력은 다음과 같습니다:

foo
foo
BAR!
foo
foo
BAR!


버퍼가 없는 채널


지금까지 본 채널은 버퍼가 없었습니다. 버퍼가 없는 채널은 보내는 쪽과 수신하는 쪽이 서로 만날 때 요소를 전송합니다(랑데부, rendezvous라고도 함). 먼저 send가 호출되면 receive가 호출될 때까지 보류되며, receive가 먼저 호출되면 send가 호출될 때까지 보류됩니다.

Channel() 팩토리 함수와 produce 빌더 모두 버퍼 크기를 지정하는 선택적인 capacity 매개변수를 사용합니다. 버퍼는 보낸 쪽이 보류되기 전에 여러 요소를 보낼 수 있도록 해주며, 용량이 지정된 BlockingQueue와 유사하게 버퍼가 가득 차면 블록됩니다.

다음 코드의 동작을 살펴보겠습니다.

val channel = Channel<Int>(4) // 버퍼가 있는 채널을 생성합니다.
val sender = launch { // 발신자 코루틴을 시작합니다.
    repeat(10) {
        println("Sending $it") // 각 요소를 보내기 전에 출력합니다
        channel.send(it) // 버퍼가 가득 차면 보류됩니다
    }
}
// 아무 것도 수신하지 않고... 그냥 기다립니다....
delay(1000)
sender.cancel() // 발신자 코루틴을 취소합니다.


이것은 용량이 4인 버퍼 채널을 사용하여 "sending"을 다섯 번 출력합니다.

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4


첫 번째 네 개의 요소는 버퍼에 추가되고 다섯 번째 요소를 보내려고 할 때 발신자가 보류됩니다.


채널은 공정합니다


채널로 보내고 받는 작업은 다중 코루틴에서의 호출 순서에 대해 공정합니다. 이들은 FIFO(First-In-First-Out) 순서에 따라 처리되며, 즉 첫 번째 코루틴이 수신을 호출하면 해당 요소를 가져옵니다. 다음 예에서 "ping"과 "pong"이라는 두 개의 코루틴은 공유된 "table" 채널에서 "ball" 객체를 수신합니다.

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // 공유 테이블
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // 볼을 제공
    delay(1000) // 1초 지연
    coroutineContext.cancelChildren() // 게임 종료, 모두 취소
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // 루프에서 볼을 수신합니다.
        ball.hits++
        println("$name $ball")
        delay(300) // 잠시 기다립니다.
        table.send(ball) // 볼을 다시 보냅니다.
    }
}


"ping" 코루틴이 먼저 시작되므로 볼을 먼저 수신합니다. "ping" 코루틴은 볼을 테이블로 다시 보낸 후 즉시 볼을 다시 받도록 시작하지만 이미 기다리고 있던 "pong" 코루틴이 볼을 받습니다.

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)


때로는 채널이 사용 중인 실행을 불공정하게 보이게 만들 수 있지만 사용 중인 실행자의 성격 때문에 불공정한 실행을 생성할 수 있습니다. 자세한 내용은 이 문제를 참조하십시오.


Ticker 채널


Ticker 채널은 마지막 소비로부터 주어진 지연 이후마다 Unit을 생성하는 특

별한 rendezvous 채널입니다. 단독으로는 쓸모없어 보일 수 있지만 복잡한 시간 기반 생산 파이프라인 및 윈도우링 및 기타 시간 종속 처리를 수행하는 연산자를 만들기 위한 유용한 기본 블록입니다. Ticker 채널은 select에서 "on tick" 동작을 수행하는 데 사용할 수 있습니다.

이러한 채널을 생성하려면 ticker라는 팩토리 메서드를 사용하십시오. 더 이상 요소가 필요하지 않음을 나타내려면 ReceiveChannel.cancel 메서드를 사용하십시오.

이제 어떻게 동작하는지 실제로 살펴보겠습니다.

fun main() = runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // ticker 채널을 생성합니다.
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // 초기 지연이 없습니다

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // 모든 후속 요소는 100ms의 지연이 있습니다
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // 큰 소비 지연을 에뮬레이트합니다
    println("Consumer pauses for 150ms")
    delay(150)
    // 다음 요소는 즉시 사용 가능합니다
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // receive 호출 간의 일시 중지가 고려되며 소비자의 일시 중지 이후 다음 요소가 더 빨리 도착합니다
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
    
    tickerChannel.cancel() // 더 이상 요소가 필요하지 않음을 나타냅니다
}


이렇게 출력됩니다.

 

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit


ticker는 가능한 소비자 일시 중지를 고려하며 기본적으로 일시 중지가 발생하면 다음 생성된 요소의 지연을 조정하여 고정된 요소 생성 속도를 유지하려고 시도합니다.

옵션으로 모드 매개변수를 TickerMode.FIXED_DELAY로 설정하여 요소 간의 고정된 지연을 유지하도록 지정할 수 있습니다.

 

원문

 

https://kotlinlang.org/docs/channels.html

반응형

댓글