코루틴 공유 가능한 가변 상태와 동시성 (Coroutines Shared mutable state and concurrency)
코루틴은 Dispatchers.Default와 같은 다중 스레드 디스패처를 사용하여 병렬로 실행될 수 있습니다. 이는 모든 일반적인 병렬화 문제를 제시합니다. 주요 문제는 공유 가능한 가변 상태에 대한 액세스 동기화입니다. 코루틴에서 이 문제에 대한 일부 해결책은 다중 스레드 세계의 해결책과 유사하지만, 다른 것도 있습니다.
문제
동일한 작업을 천 번 수행하는 백 개의 코루틴을 시작해 보겠습니다. 또한 이들의 완료 시간을 추가 비교를 위해 측정할 것입니다.
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 시작할 코루틴 수
val k = 1000 // 각 코루틴이 작업을 반복할 횟수
val time = measureTimeMillis {
coroutineScope { // 코루틴 스코프
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$time ms 내에 ${n * k} 작업 완료")
}
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
끝에 무엇이 출력될까요? "Counter = 100000"을 출력하는 것은 매우 불가능합니다. 왜냐하면 백 개의 코루틴이 동시에 여러 스레드에서 카운터를 증가시키기 때문에 어떠한 동기화도 이루어지지 않기 때문입니다.
보통 변수를 `volatile`로 만들면 도움이 되는 일반적인 오해가 있습니다. 이렇게 해보겠습니다.
@Volatile // Kotlin에서 'volatile'은 주석입니다.
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
이 코드는 더 느리게 작동하지만 종종 끝에 "Counter = 100000"을 얻지 못합니다. 왜냐하면 volatile 변수는 해당 변수에 대한 선형화 가능한(이것은 "원자적"의 기술적인 용어입니다) 읽기 및 쓰기를 보장하지만 더 큰 작업(우리의 경우 증가)에 대한 원자성을 제공하지 않기 때문입니다.
스레드 안전한 데이터 구조를 사용하는 것이 스레드 및 코루틴 양쪽에서 작동하는 일반적인 해결책입니다. 이러한 작업에 필요한 모든 동기화를 제공하는 스레드 안전(또는 동기화된, 선형화 가능한 또는 원자적인) 데이터 구조를 사용할 수 있습니다. 간단한 카운터의 경우에는 AtomicInteger 클래스를 사용할 수 있으며 이 클래스에는 원자적인 incrementAndGet 작업이 포함되어 있습니다.
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
이것은 특정 문제에 대한 가장 빠른 솔루션입니다. 이것은 일반적인 카운터, 컬렉션, 큐 및 기타 표준 데이터 구조 및 그들에 대한 기본 작업에 대해 작동합니다. 그러나 이것은 복잡한 상태나 준비된 스레드 안전 구현이 없는 복잡한 작업에 쉽게 확장되지 않을 수 있습니다.
스레드 제한을 세밀하게 적용
스레드 confinement은 공유 가능한 가변 상태 문제에 대한 접근 방식 중 하나로, 특정 공유 상태에 대한 모든 액세스가 단일 스레드로 제한됩니다. 이는 일반적으로 UI 응용 프로그램에서 사용되며 모든 UI 상태가 단일 이벤트 디스패치/응용 프로그램 스레드로 제한됩니다. 이를 코루틴과 함께 사용하려면 단일 스레드 컨텍스트를 사용하면 쉽게 적용할 수 있습니다.
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 각 증가 작업을 단일 스레드 컨텍스트로 제한
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
이 코드는 매우 느리게 작동합니다. 각 개별적인 증가 작업은 `withContext(counterContext)` 블록을 사용하여 다중 스레드 Dispatchers.Default 컨텍스트에서 단일 스레드 컨텍스트로 전환하기 때문입니다.
스레드 제한을 대략적으로 적용
실제로 스레드 confinement는 대규모 덩어리로 수행되며, 예를 들어 상태 업데이트 비즈니스 로직의 큰 부분이 단일 스레드로 제한됩니다. 다음 예제는 이렇게 수행하며, 처음에는 각 코루틴을 단일 스레드 컨텍스트에서 실행합니다.
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// 모든 것을 단일 스레드 컨텍스트로 제한
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
이제 이것은 훨씬 더 빠르게 작동하며 올바른 결과를 생성합니다.
상호 배제
문제에 대한 상호 배제 해결책은 모든 공유 상태의 수정을 동시에 실행되지 않는 크리티컬 섹션으로 보호하는 것입니다. 블로킹 환경에서는 일반적으로 synchronized 또는 ReentrantLock을 사용합니다. 코루틴의 대안은 Mutex라고 합니다. 이것은 크리티컬 섹션을 제한하기 위한 lock 및 unlock 함수가 있습니다. 중요한 차이점은 Mutex.lock()이 중단 함수임입니다. 이 함수는 스레드를 차단하지 않습니다.
또한 mutex.lock(); try { ... } finally { mutex.unlock() } 패턴을 편리하게 나타내는 withLock 확장 함수가 있습니다.
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 각 증가 작업을 락으로 보호
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
이 예제에서 락은 미세한 단계로 분류되어 있으므로 비용이 듭니다. 그러나 이것은 상태를 절대로 제한할 자연스러운 스레드가 없는 일부 상황에 대한 좋은 선택입니다.
원문
https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html
댓글