코루틴과 채널 - 튜토리얼 (Coroutines and channels − tutorial)
이 튜토리얼에서는 IntelliJ IDEA에서 코루틴을 사용하여 네트워크 요청을 수행하는 방법을 배우게 됩니다. 이 과정에서 기본 스레드를 차단하지 않거나 콜백을 사용합니다.
코루틴에 대한 사전 지식은 필요하지 않지만 기본적인 Kotlin 구문에 익숙해야 합니다.
다음을 배울 것입니다:
- 네트워크 요청을 수행하기 위해 중단 함수를 사용하는 이유와 방법.
- 코루틴을 사용하여 동시에 요청을 보내는 방법.
- 채널을 사용하여 다른 코루틴 간에 정보를 공유하는 방법.
네트워크 요청에 대해서는 Retrofit 라이브러리가 필요하지만, 이 튜토리얼에서 보여주는 방법은 코루틴을 지원하는 다른 라이브러리에 대해서도 유사하게 작동합니다.
모든 작업에 대한 해결책은 해당 프로젝트 리포지토리의 "solutions" 브랜치에서 찾을 수 있습니다.
시작하기 전에
1. IntelliJ IDEA의 최신 버전을 다운로드하고 설치합니다.
2. 다음 명령을 사용하여 프로젝트 템플릿을 복제합니다.
git clone https://github.com/kotlin-hands-on/intro-coroutines
깃허브 개발자 토큰 생성하기
프로젝트에서 GitHub API를 사용할 것입니다. 액세스를 얻으려면 GitHub 계정 이름과 비밀번호 또는 토큰을 제공해야 합니다. 이중 인증이 활성화되어 있는 경우 토큰이 필요합니다.
GitHub API를 사용하기 위해 GitHub 토큰을 생성합니다:
1. 토큰의 이름을 지정합니다. 예를 들어 "coroutines-tutorial"로 지정합니다.
2. 어떤 스코프도 선택하지 않습니다. 페이지 하단의 "토큰 생성"을 클릭합니다.
3. 생성된 토큰을 복사합니다.
코드 실행하기
프로그램은 주어진 기관(기본적으로 "kotlin"이라고 함) 아래의 모든 저장소에 대한 기여자를 로드합니다. 나중에 기여자를 기여한 횟수에 따라 정렬하는 로직을 추가할 것입니다.
1. src/contributors/main.kt 파일을 열고 main() 함수를 실행합니다. 다음 창이 표시됩니다.
2. 폰트 크기가 너무 작으면 main() 함수에서 setDefaultFontSize(18f) 값을 변경하여 조정합니다.
3. GitHub 사용자 이름과 토큰(또는 비밀번호)을 해당 필드에 제공합니다.
4. Variant 드롭다운 메뉴에서 BLOCKING 옵션이 선택되어 있는지 확인합니다.
5. "Load contributors"를 클릭합니다. UI는 잠시 동안 정지한 다음 기여자 목록을 표시해야 합니다.
6. 데이터가 로드되었는지 확인하기 위해 프로그램 출력을 엽니다. 성공적인 요청 후 기여자 목록이 기록됩니다.
이 로직을 구현하는 다양한 방법이 있습니다: 블로킹 요청 또는 콜백을 사용하는 방법 등. 이러한 솔루션들을 코루틴을 사용하는 솔루션과 비교하고 채널을 사용하여 다른 코루틴 간에 정보를 공유하는 방법을 살펴볼 것입니다.
Blocking requests
Retrofit 라이브러리를 사용하여 GitHub에 HTTP 요청을 수행합니다. 이를 통해 지정된 조직 아래의 저장소 목록과 각 저장소의 기여자 목록을 요청할 수 있습니다.
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
fun getOrgReposCall(
@Path("org") org: String
): Call<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
fun getRepoContributorsCall(
@Path("owner") owner: String,
@Path("repo") repo: String
): Call<List<User>>
}
이 API는 주어진 조직의 기여자 목록을 가져오기 위해 loadContributorsBlocking() 함수에서 사용됩니다.
1. loadContributorsBlocking() 함수의 구현을 보려면 src/tasks/Request1Blocking.kt 파일을 엽니다:
fun loadContributorsBlocking(service: GitHubService, req: RequestData): List<User> {
val repos = service
.getOrgReposCall(req.org) // #1
.execute() // #2
.also { logRepos(req, it) } // #3
.body() ?: emptyList() // #4
return repos.flatMap { repo ->
service
.getRepoContributorsCall(req.org, repo.name) // #1
.execute() // #2
.also { logUsers(repo, it) } // #3
.bodyList() // #4
}.aggregate()
}
- 먼저 주어진 조직 아래의 저장소 목록을 가져와 repos 목록에 저장합니다. 그런 다음 각 저장소에 대해 기여자 목록을 요청하고 모든 목록을 하나의 최종 기여자 목록으로 병합합니다.
- getOrgReposCall() 및 getRepoContributorsCall() 모두 *Call 클래스의 인스턴스를 반환합니다 (#1). 이 시점에서 아직 요청이 전송되지 않습니다.
- 그런 다음 요청을 수행하기 위해 *Call.execute()를 호출합니다 (#2). execute()는 기본 스레드를 차단하는 동기 호출입니다.
- 응답을 받으면 결과는 특정한 logRepos() 및 logUsers() 함수를 호출하여 기록됩니다 (#3). HTTP 응답에 오류가 포함되어 있으면 여기에서 해당 오류가 기록됩니다.
- 마지막으로 데이터를 포함하는 응답의 본문을 가져옵니다. 이 튜토리얼에서는 오류가 발생한 경우 결과로 빈 목록을 사용하고 해당 오류를 로그에 기록합니다 (#4).
2. .body() ?: emptyList()를 반복해서 사용하지 않도록, extension 함수 bodyList()가 선언되어 있습니다:
fun <T> Response<List<T>>.bodyList(): List<T> {
return body() ?: emptyList()
}
3. 프로그램을 다시 실행하고 IntelliJ IDEA의 시스템 출력을 살펴보세요. 다음과 같은 내용이 있어야 합니다.
1770 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 40 repos
2025 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 23 contributors
2229 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 45 contributors
...
- 각 줄의 첫 번째 항목은 프로그램 시작 후 경과한 밀리초 수이며, 그 다음은 대괄호 안의 스레드 이름입니다. 요청을 호출한 스레드가 어디에서 왔는지 확인할 수 있습니다.
- 각 줄의 마지막 항목은 실제 메시지입니다. 로드된 저장소 또는 기여자의 수가 표시됩니다.
이 로그 출력은 모든 결과가 메인 스레드에서 기록되었음을 보여줍니다. BLOCKING 옵션을 사용하여 코드를 실행하면 창이 정지하고 로드가 완료될 때까지 입력에 반응하지 않습니다. 모든 요청은 loadContributorsBlocking()에서 호출되는 스레드와 동일한 스레드에서 실행됩니다. 이 스레드는 주요 UI 스레드입니다 (Swing에서는 AWT 이벤트 디스패칭 스레드입니다). 주요 스레드가 차단되고 UI가 정지되는 이유입니다.
기여자 목록이 로드된 후 결과가 업데이트됩니다.
4. src/contributors/Contributors.kt에서 기여자를 로드하는 loadContributors() 함수를 찾아보고 loadContributorsBlocking()이 어떻게 호출되는지 확인하세요:
when (getSelectedVariant()) {
BLOCKING -> { // Blocking UI thread
val users = loadContributorsBlocking(service, req)
updateResults(users, startTime)
}
}
- loadContributorsBlocking() 호출 바로 다음에 updateResults() 호출이 있습니다.
- updateResults()는 UI를 업데이트하므로 항상 UI 스레드에서 호출해야 합니다.
- loadContributorsBlocking()도 UI 스레드에서 호출되므로 UI 스레드가 차단되고 UI가 정지됩니다.
Task 1
첫 번째 작업은 작업 도메인에 익숙해지도록 도와줍니다. 현재 각 기여자의 이름은 그들이 참여한 각 프로젝트마다 여러 번 반복됩니다. 사용자를 한 번만 추가하도록 사용자를 결합하는 aggregate() 함수를 구현하세요. User.contributions 속성은 해당 사용자의 모든 프로젝트에 대한 총 기여 횟수를 포함해야 합니다. 결과 목록은 기여 횟수를 기준으로 내림차순으로 정렬되어야 합니다.
src/tasks/Aggregation.kt 파일을 열고 List<User>.aggregate() 함수를 구현하세요. 사용자는 기여 횟수의 총합에 따라 정렬되어야 합니다.
해당하는 테스트 파일 test/tasks/AggregationKtTest.kt에서 예상 결과의 예제를 확인할 수 있습니다.
IntelliJ IDEA의 단축키 Ctrl+Shift+T / ⇧ ⌘ T를 사용하여 소스 코드와 테스트 클래스 사이를 자동으로 이동할 수 있습니다.
이 작업을 구현한 후 "kotlin" 조직을 위한 결과 목록은 다음과 유사해야 합니다:
Task 1의 해결책
1. 사용자를 로그인별로 그룹화하려면 groupBy()를 사용하십시오. 이것은 로그인에서 다른 저장소에서 이 로그인을 가진 사용자의 모든 발생에 대한 맵을 반환합니다.
2. 맵 항목마다 각 사용자의 총 기여 횟수를 계산하고 주어진 이름과 기여 횟수로 User 클래스의 새 인스턴스를 만듭니다.
3. 결과 목록을 내림차순으로 정렬하세요:
fun List<User>.aggregate(): List<User> =
groupBy { it.login }
.map { (login, group) -> User(login, group.sumOf { it.contributions }) }
.sortedByDescending { it.contributions }
대안으로 groupBy() 대신 groupingBy() 함수를 사용할 수도 있습니다.
Callbacks
이전 솔루션은 작동하지만 스레드를 차단하고 따라서 UI를 정지시킵니다. 이를 피하기 위한 전통적인 접근 방식은 콜백을 사용하는 것입니다.
작업이 완료된 직후 호출해야 하는 코드를 대신 콜백(일반적으로 람다)으로 추출하고 이 콜백을 호출자에게 전달하여 나중에 호출하도록 합니다.
UI를 응답성 있게 만들려면 전체 계산을 별도의 스레드로 이동하거나, 차단 호출 대신 콜백을 사용하는 Retrofit API로 전환할 수 있습니다.
백그라운드 스레드 사용
1. src/tasks/Request2Background.kt 파일을 열어 구현을 살펴보세요. 먼저 전체 계산이 다른 스레드로 이동됩니다. thread() 함수는 새 스레드를 시작합니다.
thread {
loadContributorsBlocking(service, req)
}
이제 모든 로딩이 별도의 스레드로 이동했으므로 주 메인 스레드는 놀고 있어 다른 작업에 할당될 수 있습니다.
2. loadContributorsBackground() 함수의 시그니처가 변경되었습니다. 모든 로딩이 완료된 후 호출할 updateResults() 콜백을 마지막 인수로 가져옵니다.
fun loadContributorsBackground(
service: GitHubService, req: RequestData,
updateResults: (List<User>) -> Unit
)
3. 이제 loadContributorsBackground()이 호출될 때 loadContributorsBackground() 호출 직후가 아니라 콜백 내부에서 updateResults() 호출이 수행됩니다.
loadContributorsBackground(service, req) { users ->
SwingUtilities.invokeLater {
updateResults(users, startTime)
}
}
SwingUtilities.invokeLater를 호출함으로써 결과를 업데이트하는 updateResults() 호출이 주요 UI 스레드 (AWT 이벤트 디스패칭 스레드)에서 수행되도록 보장합니다.
그러나 BACKGROUND 옵션으로 기여자를로드하려고하면 목록이 업데이트되지만 아무것도 변경되지 않음을 볼 수 있습니다.
Task 2
src/tasks/Request2Background.kt 파일의 loadContributorsBackground() 함수를 수정하여 결과 목록이 UI에 표시되도록 하세요.
task 2의 해결책
Contributors를 로드하려고 시도하면 로그에서 Contributors가 로드되지만 결과가 표시되지 않습니다. 이를 해결하려면 사용자 목록의 결과에 대해 updateResults()를 호출하십시오:
thread {
updateResults(loadContributorsBlocking(service, req))
}
콜백에 전달된 로직을 명시적으로 호출하십시오. 그렇지 않으면 아무 일도 일어나지 않습니다.
Retrofit 콜백 API 사용하기
이전 솔루션에서 전체 로딩 로직은 백그라운드 스레드로 이동되었지만, 여전히 리소스의 최상의 사용은 아닙니다. 모든 로딩 요청이 순차적으로 진행되며 스레드는 로딩 결과를 기다리는 동안 차단되며 다른 작업에 차지되지 못합니다. 구체적으로, 스레드는 더 빨리 전체 결과를 받기 위해 다른 요청을 시작할 수 있었을 것입니다.
각 리포지토리의 데이터 처리를 두 부분으로 나눠야 합니다: 로딩 및 결과 응답 처리. 두 번째 처리 부분은 콜백으로 추출되어야 합니다.
그런 다음 이전 리포지토리의 결과가 수신되기 전(및 해당 콜백이 호출되기 전)에 각 리포지토리의 로딩을 시작할 수 있습니다.
Retrofit 콜백 API를 사용하면 이를 달성할 수 있습니다. Call.enqueue() 함수는 HTTP 요청을 시작하고 콜백을 인수로 사용합니다. 이 콜백에서 각 요청 후에 수행해야 할 작업을 지정해야 합니다.
src/tasks/Request3Callbacks.kt를 열어서 이 API를 사용하는 loadContributorsCallbacks() 구현을 확인하십시오.
fun loadContributorsCallbacks(
service: GitHubService, req: RequestData,
updateResults: (List<User>) -> Unit
) {
service.getOrgReposCall(req.org).onResponse { responseRepos -> // #1
logRepos(req, responseRepos)
val repos = responseRepos.bodyList()
val allUsers = mutableListOf<User>()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers -> // #2
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
}
}
}
// TODO: 이 코드는 왜 작동하지 않을까요? 어떻게 고칠 수 있을까요?
updateResults(allUsers.aggregate())
}
}
- 편의를 위해 이 코드 단편은 동일한 파일에 선언된 onResponse() 확장 함수를 사용합니다. 이것은 객체 표현식이 아닌 람다를 인수로 사용합니다.
- 응답 처리를 위한 로직은 콜백으로 추출되었으며, 해당 람다는 #1과 #2에서 시작합니다.
그러나 제공된 솔루션은 작동하지 않습니다. 프로그램을 실행하고 CALLBACKS 옵션을 선택하여 기여자를 로드하면 아무것도 표시되지 않습니다. 그러나 즉시 결과를 반환하는 테스트는 통과합니다.
왜 제공된 코드가 기대한 대로 작동하지 않는지 생각해보고 수정하려 하거나 아래의 솔루션을 확인하십시오.
Task 3 (선택 사항)
src/tasks/Request3Callbacks.kt 파일의 코드를 수정하여 로드된 기여자 목록이 표시되도록 하십시오.
첫 번째 시도된 Task 3 솔루션
현재 솔루션에서는 많은 요청이 동시에 시작되어 총 로딩 시간이 감소합니다. 그러나 결과가 로드되지 않습니다. 이는 모든 로딩 요청이 시작된 직후에 updateResults() 콜백이 호출되기 때문입니다. allUsers 목록이 데이터로 채워질 때까지 기다리지 않습니다.
다음과 같은 변경으로 이 문제를 해결해 볼 수 있습니다.
val allUsers = mutableListOf<User>()
for ((index, repo) in repos.withIndex()) { // #1
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers ->
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
if (index == repos.lastIndex) { // #2
updateResults(allUsers.aggregate())
}
}
}
- 먼저 인덱스와 함께 리포지토리 목록을 반복합니다 (#1).
- 그런 다음 각 콜백에서 현재 반복이 마지막인지 확인합니다 (#2).
- 그럼 그렇다면 결과가 업데이트됩니다.
그러나 이 코드도 목표를 달성하지 못합니다. 이를 해결하려면 스스로 답을 찾아보거나 아래의 솔루션을 확인하십시오.
태스크 3의 두 번째 시도된 솔루션
로딩 요청이 동시에 시작되므로 마지막으로 처리된 요청이 마지막에 완료될 것이라는 보장이 없습니다. 결과는 어떤 순서로든 도착할 수 있습니다.
따라서 현재 인덱스와 lastIndex를 비교하여 완료 조건으로 사용한다면 일부 리포지토리의 결과를 잃을 위험이 있습니다.
마지막 리포지토리를 처리하는 요청이 더 빨리 반환되면(이것이 발생할 가능성이 높음) 더 많은 시간이 걸리는 이전 요청들의 결과가 모두 손실됩니다.
이를 해결하는 한 가지 방법은 인덱스를 도입하고 모든 리포지토리가 이미 처리되었는지 확인하는 것입니다.
val allUsers = Collections.synchronizedList(mutableListOf<User>())
val numberOfProcessed = AtomicInteger()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers ->
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
if (numberOfProcessed.incrementAndGet() == repos.size) {
updateResults(allUsers.aggregate())
}
}
}
이 코드는 동기화된 버전의 목록과 AtomicInteger()를 사용합니다. 일반적으로 getRepoContributors() 요청을 처리하는 서로 다른 콜백이 항상 동일한 스레드에서 호출되지 않을 수 있으므로 필요합니다.
태스크 3의 세 번째 시도된 솔루션
더 나은 솔루션은 CountDownLatch 클래스를 사용하는 것입니다. 이 클래스는 리포지토리 수로 초기화된 카운터를 저장합니다. 이 카운터는 각 리포지토리 처리 후에 감소됩니다. 그런 다음 카운터가 제로로 카운트 다운될 때까지 기다린 다음 결과를 업데이트합니다.
val countDownLatch = CountDownLatch(repos.size)
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers ->
// 리포지토리 처리
countDownLatch.countDown()
}
}
countDownLatch.await()
updateResults(allUsers.aggregate())
그런 다음 결과는 주 스레드에서 업데이트됩니다. 이것은 자식 스레드에 로직을 위임하는 것보다 더 직접적입니다.
이 세 가지 시도한 솔루션을 검토한 후에 콜백을 사용하여 올바른 코드를 작성하는 것이 비트릭하며 오류 가능성이 높다는 것을 알 수 있습니다, 특히 여러 하위 스레드 및 동기화가 발생하는 경우.
추가 연습으로 RxJava 라이브러리를 사용하여 리액티브 접근 방식으로 동일한 로직을 구현할 수 있습니다. RxJava를 사용하는 데 필요한 모든 종속성과 RxJava를 사용한 솔루션은 별도의 rx 브랜치에서 찾을 수 있습니다. 적절한 비교를 위해 이 튜토리얼을 완료하고 제안된 Rx 버전을 구현하거나 확인할 수도 있습니다.
서스펜딩 함수
동일한 로직을 서스펜딩 함수를 사용하여 구현할 수 있습니다. Call<List<Repo>>를 반환하는 대신 API 호출을 다음과 같이 서스펜딩 함수로 정의하세요:
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): List<Repo>
}
- getOrgRepos()는 서스펜딩 함수로 정의되어 있습니다. 서스펜딩 함수를 사용하여 요청을 수행할 때 백그라운드 스레드가 차단되지 않습니다. 이 작동 방식에 대한 자세한 내용은 나중 섹션에서 설명됩니다.
- getOrgRepos()는 Call을 반환하는 대신 결과를 직접 반환합니다. 결과가 성공하지 못하면 예외가 throw됩니다.
또한 Retrofit은 결과를 Response로 래핑하여 반환할 수 있습니다. 이 경우 결과 본문이 제공되며 에러를 수동으로 확인할 수 있습니다. 이 튜토리얼은 Response를 반환하는 버전을 사용합니다.
src/contributors/GitHubService.kt에 다음 선언을 GitHubService 인터페이스에 추가하세요:
interface GitHubService {
// getOrgReposCall 및 getRepoContributorsCall 선언
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): Response<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
suspend fun getRepoContributors(
@Path("owner") owner: String,
@Path("repo") repo: String
): Response<List<User>>
}
Task 4
loadContributors 함수의 코드를 변경하여 두 개의 새로운 서스펜딩 함수인 getOrgRepos() 및 getRepoContributors()를 사용하도록 변경하세요. 새로운 loadContributorsSuspend() 함수는 새 API를 사용하기 위해 suspend로 표시됩니다.
서스펜딩 함수는 어디에서나 호출할 수 없습니다. loadContributorsBlocking()에서 서스펜딩 함수를 호출하면 'Suspend function 'getOrgRepos' should be called only from a coroutine or another suspend function'라는 메시지와 함께 오류가 발생합니다.
- src/tasks/Request1Blocking.kt에 정의된 loadContributorsBlocking()의 구현을 src/tasks/Request4Suspend.kt에 정의된 loadContributorsSuspend()로 복사하세요.
- 코드를 수정하여 Call을 반환하는 대신 호출 결과를 반환하는 새로운 서스펜딩 함수를 사용하도록 변경하세요.
- 프로그램을 실행하고 SUSPEND 옵션을 선택하여 GitHub 요청이 수행되는 동안 UI가 여전히 반응하는지 확인하세요.
Task 4의 솔루션
.getOrgReposCall(req.org).execute()를 .getOrgRepos(req.org)로 대체하고 두 번째 "contributors" 요청에 대해서도 동일한 대체 작업을 반복하세요:
suspend fun loadContributorsSuspend(service: GitHubService, req: RequestData): List<User> {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
return repos.flatMap { repo ->
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}
- loadContributorsSuspend()는 서스펜딩 함수로 정의되어야 합니다.
- 이제 Response를 반환하는 대신 직접 Response를 반환하도록 API 함수가 되었으므로 더 이상 execute를 호출할 필요가 없습니다. 이 세부 정보는 Retrofit 라이브러리에 특정한 것이며 다른 라이브러리를 사용할 경우 API가 다를 수 있지만 개념은 동일합니다.
코루틴
서스펜딩 함수를 사용한 코드는 "블로킹" 버전과 유사합니다. 블로킹 버전과의 주요 차이점은 스레드를 차단하는 대신 코루틴이 일시 중단된다는 것입니다:
블록 -> 서스펜드
스레드 -> 코루틴
코루틴은 종종 가벼운 스레드라고 불립니다. 코루틴에서 코드를 실행하는 방식은 스레드에서 코드를 실행하는 방식과 유사합니다. 이전에는 차단되어야 했던 작업들이 이제 코루틴을 일시 중단할 수 있게 됩니다.
새로운 코루틴 시작
loadContributorsSuspend() 함수가 src/contributors/Contributors.kt에서 어떻게 사용되는지 살펴보면, 이 함수가 launch 내부에서 호출된 것을 볼 수 있습니다. launch는 람다를 인수로 받는 라이브러리 함수입니다:
launch {
val users = loadContributorsSuspend(req)
updateResults(users, startTime)
}
여기서 launch는 데이터를 로드하고 결과를 표시하는 데 책임을지는 새로운 계산을 시작합니다. 이 계산은 중단 가능하며, 네트워크 요청을 수행하는 동안 중단되고 기본 스레드를 해제합니다. 네트워크 요청이 결과를 반환하면 계산이 재개됩니다.
이와 같은 중단 가능한 계산을 코루틴이라고 합니다. 따라서 이 경우 launch는 데이터를 로드하고 결과를 표시하는 데 책임을지는 새로운 코루틴을 시작합니다.
코루틴은 스레드 위에서 실행되며 중단될 수 있습니다. 코루틴이 중단되면 해당 계산은 일시 중단되고 스레드에서 제거되어 메모리에 저장됩니다. 한편, 스레드는 다른 작업에 차지됩니다:
계산이 다시 시작될 준비가 되면 해당 계산이 스레드로 반환되며(반드시 동일한 스레드일 필요는 없음) 응답을 기다립니다.
loadContributorsSuspend() 예제에서 각 "contributors" 요청은 이제 중단 메커니즘을 사용하여 결과를 기다립니다. 먼저 새 요청이 전송되고 응답을 기다리는 동안 launch 함수에 의해 시작된 "load contributors" 코루틴 전체가 일시 중단됩니다.
응답이 수신되길 기다리는 동안 스레드는 다른 작업에 사용할 수 있습니다:
응답을 수신하기를 기다리는 동안, UI는 모든 요청이 메인 UI 스레드에서 수행되더라도 반응적인 상태를 유지합니다.
1. SUSPEND 옵션을 선택하여 프로그램을 실행하십시오. 로그에서 모든 요청이 메인 UI 스레드로 전송되었음을 확인할 수 있습니다:
2538 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos
2729 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - ts2kt: loaded 11 contributors
3029 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-koans: loaded 45 contributors
...
11252 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-coroutines-workshop: loaded 1 contributors
2. 로그에서 어떤 코루틴이 해당 코드를 어떤 스레드에서 실행하는지 확인할 수 있습니다. 이를 활성화하려면 Run | Edit configurations를 열고 -Dkotlinx.coroutines.debug VM 옵션을 추가하세요:
VM 옵션을 사용하여 main()이 이 옵션을 함께 실행되도록 하면 코루틴 이름이 스레드 이름에 추가됩니다. 또한 모든 Kotlin 파일을 실행하는 템플릿을 수정하여 기본적으로 이 옵션을 활성화할 수 있습니다.
이제 모든 코드가 하나의 코루틴(@coroutine#1로 표시됨)에서 실행됩니다. 결과를 기다리는 동안 스레드를 다른 요청을 보내기 위해 재사용해서는 안됩니다. 왜냐하면 코드가 순차적으로 작성되었기 때문입니다. 이전 결과가 수신되었을 때만 새로운 요청이 전송됩니다.
서스펜딩 함수는 스레드를 공정하게 처리하고 "대기"를 위해 스레드를 차단하지 않습니다. 그러나 아직까지 이로 인해 병렬성이 도입되는 것은 아닙니다.
동시성
코틀린 코루틴은 스레드보다 리소스 소모가 훨씬 적습니다. 비동기적으로 새로운 계산을 시작하려면 새로운 코루틴을 생성할 수 있습니다.
새로운 코루틴을 시작하려면 주요 코루틴 빌더 중 하나인 launch, async 또는 runBlocking을 사용합니다. 다른 라이브러리는 추가 코루틴 빌더를 정의할 수 있습니다.
async는 새로운 코루틴을 시작하고 Deferred 객체를 반환합니다. Deferred는 미래에 얻을 최종 결과를 지연시키는 개념을 나타냅니다. 즉, 미래의 어느 시점에 결과를 약속합니다.
async와 launch의 주요 차이점은 launch는 특정 결과를 반환하지 않을 것으로 예상되는 계산을 시작하는 데 사용됩니다. launch는 코루틴을 나타내는 Job을 반환합니다. Job은 코루틴을 나타내며 이를 호출하여 완료될 때까지 대기할 수 있습니다.
Deferred는 제네릭 타입으로 Job을 확장한 것입니다. async 호출은 람다에서 반환하는 내용에 따라 Deferred<Int> 또는 Deferred<CustomType>을 반환할 수 있습니다(람다 내의 마지막 표현식이 결과입니다).
코루틴 결과를 얻으려면 Deferred 인스턴스에서 await()를 호출할 수 있습니다. 결과를 기다리는 동안에는 이 await()를 호출한 코루틴이 일시 중단됩니다:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferred: Deferred<Int> = async {
loadData()
}
println("waiting...")
println(deferred.await())
}
suspend fun loadData(): Int {
println("loading...")
delay(1000L)
println("loaded!")
return 42
}
runBlocking은 일반 함수와 서스펜딩 함수, 블로킹과 비블로킹 세계 사이에서 브릿지 역할을 하는 것으로 사용됩니다. 이것은 주로 main() 함수와 테스트에서 사용되도록 의도되었습니다.
Deferred 객체 목록이 있는 경우 awaitAll()을 호출하여 모든 결과를 기다릴 수 있습니다:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferreds: List<Deferred<Int>> = (1..3).map {
async {
delay(1000L * it)
println("Loading $it")
it
}
}
val sum = deferreds.awaitAll().sum()
println("$sum")
}
각 "contributors" 요청이 새로운 코루틴 내에서 시작될 때, 모든 요청은 비동기적으로 시작됩니다. 이전 요청의 결과를 받기 전에 새로운 요청을 보낼 수 있습니다.
총 로딩 시간은 CALLBACKS 버전과 대략 동일하지만 콜백이 필요하지 않습니다. 더욱이 async는 코드에서 어떤 부분이 동시에 실행되는지 명시적으로 강조합니다.
Task 5
Request5Concurrent.kt 파일에서 loadContributorsConcurrent() 함수를 이전 loadContributorsSuspend() 함수를 사용하여 구현하십시오.
힌트
코루틴을 시작하려면 코루틴 스코프 내에서만 새 코루틴을 시작할 수 있습니다. loadContributorsSuspend()의 내용을 coroutineScope 호출로 복사하여 async 함수를 호출할 수 있도록 하세요.
suspend fun loadContributorsConcurrent(
service: GitHubService,
req: RequestData
): List<User> = coroutineScope {
// ...
}
다음 스키마를 기반으로 솔루션을 작성하십시오:
val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
async {
// load contributors for each repo
}
}
deferreds.awaitAll() // List<List<User>>
Task 5 솔루션
각 "contributors" 요청을 async로 래핑하여 저장소 수만큼의 코루틴을 생성하세요. async는 Deferred<List<User>>를 반환합니다. 이는 많은 코루틴을 생성하는 데 큰 자원을 소모하지 않기 때문에 필요한 만큼 생성할 수 있습니다.
1. 이제 flatMap을 사용할 수 없으므로 map 결과는 이제 목록의 목록이 아닌 Deferred 객체 목록입니다. awaitAll()은 List<List<User>>를 반환하므로 flatten().aggregate()를 호출하여 결과를 얻을 수 있습니다:
suspend fun loadContributorsConcurrent(
service: GitHubService,
req: RequestData
): List<User> = coroutineScope {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
async {
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}
}
deferreds.awaitAll().flatten().aggregate()
}
2. 코드를 실행하고 로그를 확인하세요. 모든 코루틴은 여전히 메인 UI 스레드에서 실행되지만 병렬로 코루틴을 실행하는 이점을 이미 확인할 수 있습니다.
3. 이 코드를 수정하여 "contributors" 코루틴을 일반 스레드가 아닌 공통 스레드 풀의 다른 스레드에서 시작하도록 변경하려면 async 함수의 context 인수로 Dispatchers.Default를 지정하세요:
async(Dispatchers.Default) { }
- CoroutineDispatcher는 해당 코루틴이 실행될 스레드 또는 스레드를 결정합니다. 인수로 지정하지 않으면 async는 외부 스코프의 디스패처를 사용합니다.
- Dispatchers.Default는 JVM에서 공유 스레드 풀을 나타내며 병렬 실행을 위한 수단을 제공합니다. 사용 가능한 CPU 코어의 수만큼 스레드로 구성되지만 코어가 하나만 있는 경우에도 두 개의 스레드가 있을 것입니다.
4. loadContributorsConcurrent() 함수의 코드를 수정하여 공통 스레드 풀의 다른 스레드에서 새로운 코루틴을 시작하도록 변경하십시오. 또한 요청을 보내기 전에 추가적인 로깅을 추가하세요:
async(Dispatchers.Default) {
log("starting loading for ${repo.name}")
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}
5. 프로그램을 다시 실행하십시오. 로그에서 각 코루틴이 스레드 풀의 하나의 스레드에서 시작하고 다른 스레드에서 재개될 수 있음을 볼 수 있습니다:
1946 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans
1946 [DefaultDispatcher-worker-3 @coroutine#5] INFO Contributors - starting loading for dokka
1946 [DefaultDispatcher-worker-1 @coroutine#3] INFO Contributors - starting loading for ts2kt
...
2178 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors
2569 [DefaultDispatcher-worker-1 @coroutine#5] INFO Contributors - dokka: loaded 36 contributors
2821 [DefaultDispatcher-worker-2 @coroutine#3] INFO Contributors - ts2kt: loaded 11 contributors
예를 들어, 이 로그에서 coroutine#4는 worker-2 스레드에서 시작되고 worker-1 스레드에서 계속 진행됩니다.
src/contributors/Contributors.kt에서 CONCURRENT 옵션의 구현을 확인하세요:
1. 코루틴을 오직 메인 UI 스레드에서만 실행하려면 Dispatchers.Main을 인수로 지정하세요:
launch(Dispatchers.Main) {
updateResults()
}
- 메인 스레드가 새로운 코루틴을 시작할 때 메인 스레드가 사용 중이라면 해당 코루틴은 중지되고이 스레드에서 실행될 예정입니다. 코루틴은 해당 스레드가 무료 상태가 될 때까지만 재개됩니다.
- 일반적으로 각 끝점에서 인수로 dispatcher를 명시적으로 지정하는 대신 외부 스코프에서 dispatcher를 사용하는 것이 좋은 습관입니다. Dispatchers.Default를 인수로 전달하지 않고 loadContributorsConcurrent()를 정의하면 Default 디스패처, 메인 UI 스레드 또는 사용자 정의 디스패처와 함께 이 함수를 호출할 수 있습니다.
- 나중에 테스트에서 loadContributorsConcurrent()를 호출할 때 테스트 디스패처인 TestDispatcher를 사용할 수 있으므로 이 솔루션은 훨씬 유연해집니다.
2. 호출자 쪽에서 디스패처를 지정하려면 다음 변경을 적용하여 loadContributorsConcurrent()에서 상속된 컨텍스트에서 새 코루틴을 시작하도록 하세요:
launch(Dispatchers.Default) {
val users = loadContributorsConcurrent(service, req)
withContext(Dispatchers.Main) {
updateResults(users, startTime)
}
}
- updateResults()는 메인 UI 스레드에서 호출되어야 하므로 Dispatchers.Main의 컨텍스트로 호출합니다.
- withContext()는 지정된 코루틴 컨텍스트에서 주어진 코드를 호출하고 완료될 때까지 일시 중단되며 결과를 반환합니다. 이를 표현하는 또 다른 방법은 새로운 코루틴을 시작하고 (일시 중단하여) 완료될 때까지 기다리는 것입니다: launch(context) { ... }.join().
3. 코드를 실행하고 코루틴이 스레드 풀의 스레드에서 실행되는지 확인하십시오.
구조화된 동시성
- 코루틴 스코프는 서로 다른 코루틴 간의 구조와 부모-자식 관계를 담당합니다. 일반적으로 새로운 코루틴은 스코프 내에서 시작되어야 합니다.
- 코루틴 컨텍스트는 주어진 코루틴을 실행하기 위해 사용되는 추가적인 기술적 정보를 저장합니다. 이 정보에는 코루틴의 사용자 정의 이름이나 코루틴이 예약될 스레드를 지정하는 디스패처(dispatcher)와 같은 내용이 포함됩니다.
launch, async 또는 runBlocking을 사용하여 새로운 코루틴을 시작할 때 자동으로 해당 스코프가 생성됩니다. 이러한 모든 함수는 수신자(receiver)를 가진 람다를 인수로 취하며, CoroutineScope가 암시적 수신자 유형입니다.
launch { /* this: CoroutineScope */ }
- 새로운 코루틴은 반드시 스코프 내에서 시작해야 합니다.
- launch와 async는 CoroutineScope의 확장 함수로 선언되어 있으므로 호출할 때 항상 암시적 또는 명시적 수신자를 전달해야 합니다.
- runBlocking은 최상위 함수로 정의되어 있으므로 단일 예외입니다. 그러나 현재 스레드를 차단하기 때문에 주로 main() 함수와 테스트에서 브릿지 역할로 사용됩니다.
runBlocking, launch 또는 async 내에서 새 코루틴을 시작하면 자동으로 해당 스코프 내에서 시작됩니다.
import kotlinx.coroutines.*
fun main() = runBlocking { /* this: CoroutineScope */
launch { /* ... */ }
// 위와 동일함:
this.launch { /* ... */ }
}
runBlocking 내에서 launch를 호출할 때는 CoroutineScope 유형의 암시적 수신자에 대한 확장 함수로 호출됩니다. 또 다른 방법으로는 명시적으로 `this.launch`를 작성하는 것입니다.
이 예제에서 launch 내에서 시작된 중첩 코루틴(부모 코루틴)은 runBlocking에서 시작된 외부 코루틴(부모 코루틴)의 스코프에서 시작됩니다. 이러한 "부모-자식" 관계는 스코프를 통해 작동하며, 자식 코루틴은 부모 코루틴에 해당하는 스코프에서 시작됩니다.
coroutineScope 함수를 사용하여 별도의 코루틴을 시작하지 않고도 새로운 스코프를 만들 수 있습니다. 스코프 내에서 외부 스코프에 액세스하지 않고도 suspend 함수 내에서 구조화된 방식으로 새로운 코루틴을 시작하려면, suspend 함수가 호출된 외부 스코프의 하위 스코프가 자동으로 생성되도록 할 수 있습니다. 이는 `loadContributorsConcurrent()` 함수를 잘 보여줍니다.
또한 GlobalScope를 사용하여 전역 스코프에서 새 코루틴을 시작할 수 있습니다. 이렇게 하면 최상위 "독립적인" 코루틴이 생성됩니다.
구조화된 동시성(structured concurrency)의 메커니즘은 다음과 같은 이점을 제공합니다.
- 스코프는 일반적으로 자식 코루틴을 담당하며, 자식 코루틴의 수명은 해당 스코프의 수명에 연결됩니다.
- 스코프는 오류가 발생하거나 사용자가 작업을 취소하려고 결정할 때 자동으로 자식 코루틴을 취소할 수 있습니다.
- 스코프는 모든 자식 코루틴의 완료를 자동으로 기다립니다. 따라서 스코프가 코루틴에 해당하는 경우, 부모 코루틴은 해당 스코프에서 시작된 모든 코루틴이 완료될 때까지 완료되지 않습니다.
GlobalScope.async을 사용할 때는 여러 코루틴을 더 작은 스코프에 묶는 구조가 없습니다. 전역 스코프에서 시작된 코루틴은 모두 독립적입니다. 그 수명은 전체 응용 프로그램의 수명에만 영향을 받습니다. 전역 스코프에서 시작된 코루틴의 참조를 저장하고 명시적으로 완료를 기다리거나 명시적으로 취소할 수는 있지만, 구조화된 동시성(structured concurrency)처럼 자동으로 이루어지지는 않습니다.
기여자 로딩 취소하기
기여자 목록을로드하는 함수의 두 가지 버전을 만들어서 부모 코루틴을 취소할 때 어떻게 동작하는지 비교합니다. 첫 번째 버전은 모든 자식 코루틴을 시작하기 위해 coroutineScope를 사용하고, 두 번째 버전은 GlobalScope를 사용합니다.
1. Request5Concurrent.kt에 loadContributorsConcurrent() 함수에 3초 지연을 추가합니다.
suspend fun loadContributorsConcurrent(
service: GitHubService,
req: RequestData
): List<User> = coroutineScope {
// ...
async {
log("starting loading for ${repo.name}")
delay(3000)
// load repo contributors
}
// ...
}
이 지연은 모든 요청을 시작한 후에 요청이 보내기 전에 로딩을 취소할 충분한 시간을 확보하기 위해 모든 코루틴에 영향을 줍니다.
2. 로딩 함수의 두 번째 버전을 만듭니다. loadContributorsConcurrent()의 구현을 Request5NotCancellable.kt에 loadContributorsNotCancellable()로 복사한 다음 새로운 coroutineScope를 만듭니다.
3. async 호출은 이제 해결되지 않으므로 GlobalScope.async를 사용하여 시작합니다.
suspend fun loadContributorsNotCancellable(
service: GitHubService,
req: RequestData
): List<User> {
// ...
GlobalScope.async {
log("starting loading for ${repo.name}")
// load repo contributors
}
// ...
return deferreds.awaitAll().flatten().aggregate()
}
- 함수는 이제 결과를 직접 반환하며 람다 내에서의 마지막 표현식으로 반환하지 않습니다.
- 모든 "contributors" 코루틴은 GlobalScope 내에서 시작되므로 코루틴 스코프(부모 코루틴)의 자식이 아닙니다.
4. 프로그램을 실행하고 CONTRIBUTORS 옵션을 선택하여 기여자를로드합니다.
5. 모든 "contributors" 코루틴이 시작될 때까지 기다린 다음 "취소"를 클릭합니다. 로그에는 새로운 결과가 표시되지 않으므로 모든 요청이 실제로 취소되었음을 의미합니다.
2896 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 40 repos
2901 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans
...
2909 [DefaultDispatcher-worker-5 @coroutine#36] INFO Contributors - starting loading for mpp-example
/* click on 'cancel' */
/* no requests are sent */
6. 두 번째로 NOT_CANCELLABLE 옵션을 선택하여 다시 시도합니다.
2570 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos
2579 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - starting loading for kotlin-koans
...
2586 [DefaultDispatcher-worker-6 @coroutine#36] INFO Contributors - starting loading for mpp-example
/* click on 'cancel' */
/* but all the requests are still sent: */
6402 [DefaultDispatcher-worker-5 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors
...
9555 [DefaultDispatcher-worker-8 @coroutine#36] INFO Contributors - mpp-example: loaded 8 contributors
기다린 다음 "취소"를 클릭하면 모든 요청이 여전히 전송되는 것을 확인할 수 있습니다.
7. "contributors" 프로그램에서 취소가 어떻게 트리거되는지 확인해보겠습니다. "취소" 버튼이 클릭되면 주요 "로딩" 코루틴이 명시적으로 취소되며 하위 코루틴도 자동으로 취소됩니다:
interface Contributors {
fun loadContributors() {
// ...
when (getSelectedVariant()) {
CONCURRENT -> {
launch {
val users = loadContributorsConcurrent(service, req)
updateResults(users, startTime)
}.setUpCancellation() // #1
}
}
}
private fun Job.setUpCancellation() {
val loadingJob = this // #2
// '취소' 버튼이 클릭되면 로딩 작업을 취소합니다.
val listener = ActionListener {
loadingJob.cancel() // #3
updateLoadingStatus(CANCELED)
}
// '취소' 버튼에 리스너를 추가합니다.
addCancelListener(listener)
// 로딩 작업이 완료된 후에 상태를 업데이트하고 리스너를 제거합니다.
}
}
launch 함수는 Job 인스턴스를 반환합니다. Job은 "로딩 코루틴"을 참조하며 데이터를로드하고 결과를 업데이트합니다. 이를 위해 Job 인스턴스를 수신자로 사용하여 setUpCancellation() 확장 함수를 호출할 수 있습니다(#1 라인).
다른 방법으로는 다음과 같이 명시적으로 작성할 수 있습니다:
val job = launch { }
job.setUpCancellation()
- 가독성을 위해 setUpCancellation() 함수 수신자를 함수 내에서 새로운 loadingJob 변수로 참조할 수 있습니다(#2 라인).
- 그런 다음 "취소" 버튼에 리스너를 추가하여 클릭될 때 loadingJob이 취소되도록합니다(#3 라인).
구조화된 동시성을 사용하면 부모 코루틴만 취소하면 됩니다. 자식 코루틴으로 취소가 자동으로 전파됩니다.
외부 스코프의 컨텍스트 사용하기
주어진 스코프 내에서 새로운 코루틴을 시작할 때, 모든 코루틴이 동일한 컨텍스트에서 실행되도록 보장하는 것이 훨씬 쉽습니다. 또한 필요한 경우 컨텍스트를 쉽게 교체할 수 있습니다.
이제 외부 스코프에서 디스패처(dispatcher)를 사용하는 방법을 배우는 시간입니다. coroutineScope 또는 coroutine 빌더에 의해 생성된 새로운 스코프는 항상 외부 스코프에서 컨텍스트를 상속합니다. 이 경우 외부 스코프는 suspend loadContributorsConcurrent() 함수가 호출된 스코프입니다.
launch(Dispatchers.Default) { // 외부 스코프
val users = loadContributorsConcurrent(service, req)
// ...
}
모든 중첩된 코루틴은 상속된 컨텍스트로 자동으로 시작됩니다. 디스패처(dispatcher)는 이 컨텍스트의 일부입니다. 따라서 async로 시작된 모든 코루틴은 기본 디스패처(default dispatcher)의 컨텍스트에서 시작됩니다.
suspend fun loadContributorsConcurrent(
service: GitHubService, req: RequestData
): List<User> = coroutineScope {
// 이 스코프는 외부 스코프에서 컨텍스트를 상속받습니다.
// ...
async { // 상속된 컨텍스트에서 시작된 중첩 코루틴
// ...
}
// ...
}
구조화된 동시성을 사용하면 최상위 코루틴을 생성할 때 주요 컨텍스트 요소(디스패처와 같은)를 한 번만 지정할 수 있습니다. 그런 다음 모든 중첩 코루틴은 컨텍스트를 상속하고 필요한 경우에만 수정합니다.
코루틴을 사용하여 UI 응용 프로그램을 작성할 때, 예를 들어 Android 앱의 경우 최상위 코루틴에 대한 기본값으로 CoroutineDispatchers.Main을 사용하고 코드를 다른 스레드에서 실행해야 할 때 필요한 경우에만 다른 디스패처(dispatcher)를 명시적으로 지정하는 것이 일반적인 관행입니다.
진행 상황 표시
일부 저장소의 정보가 비교적 빨리 로드되더라도 사용자는 모든 데이터가 로드될 때까지 결과 목록만 보게 됩니다. 그때까지 로더 아이콘이 진행 상황을 보여주지만 현재 상태나 이미 로드된 기여자에 대한 정보는 없습니다.
데이터를 로드한 후 각 저장소에 대한 데이터가 로드될 때마다 중간 상태로 업데이트되는 콜백으로 UI를 업데이트할 수 있도록 로직을 전달해야 합니다.
진행 상황을 표시하기 위해 중간 상태를 보여주는 loadContributorsProgress() 함수를 구현해야 합니다. 이 함수는 다음과 같이 선언됩니다:
suspend fun loadContributorsProgress(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) {
// 데이터 로딩
// 중간 상태에서 `updateResults()` 호출
}
Contributors.kt 파일에서 해당 콜백은 PROGRESS 옵션을 위해 Main 스레드에서 결과를 업데이트하기 위해 전달됩니다:
launch(Dispatchers.Default) {
loadContributorsProgress(service, req) { users, completed ->
withContext(Dispatchers.Main) {
updateResults(users, startTime, completed)
}
}
}
- loadContributorsProgress()에서 updateResults() 매개변수는 suspend로 선언되어 있습니다. 따라서 해당 람다 인수 내에서 suspend 함수인 withContext를 호출해야 합니다.
- updateResults() 콜백은 로딩이 완료되고 결과가 최종인지 여부를 나타내는 추가 Boolean 매개변수를 인수로 받습니다.
Task 6
Request6Progress.kt 파일에서 loadContributorsProgress() 함수를 구현하십시오. 이 함수는 중간 진행 상태를 표시하는 기능을 합니다. Request4Suspend.kt의 loadContributorsSuspend() 함수를 기반으로 만들어야 합니다.
- 간단한 버전을 사용하며 병렬 처리는 나중에 다음 섹션에서 추가할 것입니다.
- 중간 기여자 목록은 "통합" 상태로 표시되어야 하며 각 저장소마다 로드된 사용자 목록만이 아니어야 합니다.
- 각 사용자의 총 기여 횟수는 각 새 저장소의 데이터가 로드될 때마다 증가해야 합니다.
Task 6 해결책
"통합" 상태의 중간로드된 기여자 목록을 저장하려면 모든 사용자를 저장하는 allUsers 변수를 정의한 다음 새 저장소의 기여자가 로드된 후에 업데이트해야 합니다:
suspend fun loadContributorsProgress(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
var allUsers = emptyList<User>()
for ((index, repo) in repos.withIndex()) {
val users = service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
allUsers = (allUsers + users).aggregate()
updateResults(allUsers, index == repos.lastIndex)
}
}
연속 대 병렬
updateResults() 콜백은 각 요청이 완료된 후에 호출됩니다.
이 코드에는 병렬 처리가 포함되어 있지 않습니다. 이 코드는 순차적이므로 동기화가 필요하지 않습니다.
가장 좋은 옵션은 요청을 병렬로 보내고 각 저장소에 대한 응답을 받은 후에 중간 결과를 업데이트하는 것입니다.
병렬 처리를 추가하려면 채널을 사용하세요.
채널
공유 가능한 가변 상태로 코드를 작성하는 것은 꽤 어렵고 오류가 발생하기 쉽습니다(콜백을 사용한 솔루션에서와 같이). 더 간단한 방법은 공통 가변 상태를 사용하는 대신 통신을 통해 정보를 공유하는 것입니다. 코루틴은 채널을 통해 서로 통신할 수 있습니다.
채널은 코루틴 간에 데이터를 전달할 수 있는 통신 기본 요소입니다. 하나의 코루틴은 채널로 정보를 보낼 수 있으며 다른 코루틴은 채널에서 해당 정보를 받을 수 있습니다.
정보를 보내는 코루틴은 종종 프로듀서(producer)라고 하며 정보를 받는 코루틴은 컨슈머(consumer)라고 합니다. 하나 이상의 코루틴이 동일한 채널로 정보를 보낼 수 있으며 하나 이상의 코루틴이 그 정보를 받을 수 있습니다.
많은 코루틴이 동일한 채널에서 정보를 받을 때, 각 요소는 소비자 중 하나에 의해 한 번만 처리됩니다. 요소가 처리되면 즉시 채널에서 제거됩니다.
채널을 요소의 컬렉션 또는 더 정확히는 큐와 유사한 것으로 생각할 수 있습니다. 요소는 한쪽 끝에 추가되고 다른 쪽에서 받게됩니다. 그러나 중요한 차이점이 있습니다: 컬렉션과 달리 심지어 동기화된 버전에서도 채널은 send()와 receive() 작업을 일시 중단할 수 있습니다. 이것은 채널이 비어 있거나 가득 찬 경우에 발생합니다. 채널 크기에 상한값이 있는 경우 채널이 가득 찰 수 있습니다.
채널은 세 가지 다른 인터페이스로 표현됩니다: SendChannel, ReceiveChannel 및 Channel이며, 후자는 첫 두 개를 확장합니다. 일반적으로 채널을 생성하고 프로듀서에게 SendChannel 인스턴스로 제공하여 그들만이 채널로 정보를 보낼 수 있도록 합니다. 컨슈머에게는 ReceiveChannel 인스턴스로 채널을 제공하여 그들만이 정보를 받을 수 있도록 합니다. send와 receive 메서드 모두 suspend로 선언됩니다.
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
프로듀서는 더 이상 요소가 오지 않음을 나타내기 위해 채널을 닫을 수 있습니다.
라이브러리에서 여러 유형의 채널이 정의되어 있습니다. 내부적으로 얼마나 많은 요소를 저장할 수 있는지와 send() 호출이 일시 중단될 수 있는지 여부에 따라 다릅니다. 모든 채널 유형에 대해 receive() 호출은 유사하게 작동합니다: 채널이 비어 있지 않은 경우 요소를 받고, 그렇지 않으면 일시 중단됩니다.
무제한 채널 (Unlimited channel)
무제한 채널은 큐에 가장 가까운 유사물입니다. 프로듀서는 이 채널로 요소를 보낼 수 있고 무한히 계속 성장합니다. send() 호출은 절대로 일시 중단되지 않습니다. 프로그램이 메모리 부족 상태가 되면 OutOfMemoryException이 발생합니다. 무제한 채널과 큐의 차이점은 컨슈머가 비어 있는 채널에서 수신하려고 시도할 때입니다. 이 경우, 컨슈머는 새로운 요소가 전송될 때까지 일시 중단됩니다.
버퍼링된 채널 (Buffered channel)
버퍼링된 채널의 크기는 지정된 개수에 따라 제한됩니다. 프로듀서는 이 채널로 요소를 보낼 수 있으며 크기 제한에 도달할 때까지 계속됩니다. 모든 요소는 내부적으로 저장됩니다. 채널이 가득 찬 경우 다음 send 호출은 더 많은 여유 공간이 생길 때까지 일시 중단됩니다.
만남 채널 (Rendezvous channel)
"만남" 채널은 버퍼가 없는 채널로, 크기가 0인 버퍼링된 채널과 동일합니다. 두 함수 중 하나(send() 또는 receive())는 항상 일시 중단됩니다.
send() 함수가 호출되고 처리할 요소를 처리할 준비가 된 중지된 receive 호출이 없으면 send()가 일시 중단됩니다. 마찬가지로 receive 함수가 호출되고 채널이 비어있거나, 다시 말해 중지된 send() 호출이 요소를 보낼 준비가 되지 않은 경우 receive() 호출이 일시 중단됩니다.
"만남"이라는 이름("약속된 시간과 장소에서의 만남")은 send()와 receive()가 "시간에 맞춰 만나야 한다"는 사실을 나타냅니다.
콘플레이트 채널 (Conflated channel)
콘플레이트 채널에 보낸 새로운 요소는 이전에 보낸 요소를 덮어씁니다. 따라서 수신자는 항상 최신 요소만 받습니다. send() 호출은 절대로 일시 중단되지 않습니다.
채널을 생성할 때 유형이나 버퍼 크기(버퍼링된 경우)를 지정합니다.
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)
기본적으로 "만남" 채널이 생성됩니다.
다음 작업에서는 "만남" 채널, 두 개의 프로듀서 코루틴 및 컨슈머 코루틴을 만들 것입니다:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
이제 "만남" 채널로 기여자를 로드하고 결과를 볼 수 있도록 프로그램을 실행하고 CHANNELS 옵션을 선택할 수 있습니다.
코루틴과 채널은 병렬 처리로 인해 발생하는 복잡성을 완전히 제거하지는 않지만 어떤 일이 일어나고 있는지 이해하는 데 도움이 되는 도구입니다.
태스크 7
src/tasks/Request7Channels.kt에서 loadContributorsChannels() 함수를 구현하십시오. 이 함수는 모든 GitHub 기여자를 동시에 요청하고 동시에 중간 진행 상태를 표시해야 합니다.
이전 함수인 Request5Concurrent.kt의 loadContributorsConcurrent() 및 Request6Progress.kt의 loadContributorsProgress()를 사용하십시오.
태스크 7 해결책 팁
서로 다른 저장소에 대한 기여자 목록을 동시에 수신하는 여러 코루틴은 모두 동일한 채널로 받은 모든 결과를 보낼 수 있습니다.
val channel = Channel<List<User>>()
for (repo in repos) {
launch {
val users = TODO()
// ...
channel.send(users)
}
}
그런 다음 이 채널에서 요소를 하나씩 받아서 처리할 수 있습니다.
repeat(repos.size) {
val users = channel.receive()
// ...
}
receive() 호출이 순차적으로 이루어지므로 추가 동기화가 필요하지 않습니다.
태스크 7 해결책
loadContributorsProgress() 함수와 마찬가지로 "모든 기여자" 목록의 중간 상태를 저장하는 allUsers 변수를 생성하여 "모든 사용자" 목록의 중간 상태를 저장할 수 있습니다. 채널에서 받은 각 새 목록은 모든 사용자 목록에 추가됩니다. 그런 다음 결과를 집계하고 updateResults 콜백을 사용하여 상태를 업데이트합니다.
suspend fun loadContributorsChannels(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) = coroutineScope {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
val channel = Channel<List<User>>()
for (repo in repos) {
launch {
val users = service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
channel.send(users)
}
}
var allUsers = emptyList<User>()
repeat(repos.size) {
val users = channel.receive()
allUsers = (allUsers + users).aggregate()
updateResults(allUsers, it == repos.lastIndex)
}
}
- 다른 저장소에 대한 결과는 준비되자마자 채널로 추가되므로 처음에는 모든 요청이 전송되고 데이터가 수신되지 않는 상태에서 receive() 호출이 일시 중단됩니다. 이 경우 "기여자 로드" 코루틴 전체가 일시 중단됩니다.
- 그런 다음 사용자 목록이 채널로 전송되면 "기여자 로드" 코루틴이 재개되고 receive() 호출이 이 목록을 반환하며 결과가 즉시 업데이트됩니다.
이제 프로그램을 실행하고 CHANNELS 옵션을 선택하여 기여자를 로드하고 결과를 확인할 수 있습니다.
코루틴 및 채널은 병렬 처리로 인한 복잡성을 완전히 제거하지는 않지만 어떤 일이 발생하는지 이해하는 데 도움이 되는 도구입니다.
코루틴 테스트
이제 모든 해결책을 테스트하여 동시 코루틴을 사용한 해결책이 일시 중단 함수를 사용한 해결책보다 더 빠른지 확인하고 채널을 사용한 해결책이 간단한 "진행" 해결책보다 빠른지 확인해보겠습니다.
다음 작업에서는 각 해결책의 총 실행 시간을 비교할 것입니다. GitHub 서비스를 모방하고 이 서비스가 주어진 대기 시간 후에 결과를 반환하도록 만들겠습니다.
repos 요청 - 1000 ms 지연 후에 답변 반환
repo-1 - 1000 ms 지연
repo-2 - 1200 ms 지연
repo-3 - 800 ms 지연
일시 중단 함수를 사용한 순차적인 해결책은 약 4000 ms가 걸릴 것으로 예상됩니다 (4000 = 1000 + (1000 + 1200 + 800)). 동시 해결책은 약 2200 ms가 걸릴 것으로 예상됩니다 (2200 = 1000 + max(1000, 1200, 800)).
진행 상황을 보여주는 해결책의 경우 중간 결과를 타임스탬프와 함께 확인할 수도 있습니다.
해당 테스트 데이터는 test/contributors/testData.kt에 정의되어 있으며, 파일 Request4SuspendKtTest, Request7ChannelsKtTest 등에는 모의 서비스 호출을 사용하는 간단한 테스트가 포함되어 있습니다.
그러나 여기에는 두 가지 문제가 있습니다:
- 이러한 테스트를 실행하는 데 시간이 너무 오래 걸립니다. 각 테스트는 약 2에서 4초가 걸리며 결과를 기다려야 합니다. 효율적이지 않습니다.
- 해결한 코드를 준비하고 실행하는 데 추가 시간이 소요되므로 정확한 실행 시간에 의존할 수 없습니다. 일정한 값을 추가할 수 있지만, 그렇게 하면 시간이 기계마다 다를 것입니다. 모의 서비스 지연 시간은 이 상수보다 높아야 차이를 볼 수 있습니다. 상수가 0.5초라면, 지연 시간을 0.1초로 설정해도 충분하지 않습니다.
더 나은 방법은 동일한 코드를 여러 번 실행하면서 실행 시간을 테스트하는 특수한 프레임워크를 사용하는 것입니다. 그러나 이것은 복잡하게 배우고 설정하기 어려운 부분입니다.
이러한 문제를 해결하고 제공된 테스트 지연 시간을 사용한 해결책이 예상대로 동작하고 다른 것보다 빠르게 동작하도록 하려면 특수한 테스트 디스패처를 사용하여 가상 시간을 활용하면 됩니다. 이 디스패처는 시작부터 경과한 가상 시간을 추적하고 모든 것을 실제 시간에서 즉시 실행합니다. 이 디스패처에서 코루틴을 실행할 때 지연은 실제 시간이 아닌 가상 시간으로 진행됩니다.
이 메커니즘을 사용하는 테스트는 빠르게 실행되지만 가상 시간의 다른 순간에 어떤 일이 일어나는지 확인할 수 있습니다. 총 실행 시간이 크게 줄어듭니다.
가상 시간을 사용하려면 runBlocking 호출을 runTest로 바꾸고 TestScope의 확장 람다로 전달하면 됩니다. 이 특별한 스코프 내의 일시 중단 함수에서 delay를 호출하면 실제 시간이 아닌 가상 시간이 증가합니다.
@Test
fun testDelayInSuspend() = runTest {
val realStartTime = System.currentTimeMillis()
val virtualStartTime = currentTime
foo()
println("${System.currentTimeMillis() - realStartTime} ms") // ~ 6 ms
println("${currentTime - virtualStartTime} ms") // 1000 ms
}
suspend fun foo() {
delay(1000) // 실제 지연 없이 자동으로 진행됨
println("foo") // foo()가 호출될 때 eager하게 실행됨
}
TestScope의 currentTime 속성을 사용하여 현재 가상 시간을 확인할 수 있습니다.
이 예제에서 실제 실행 시간은 몇 밀리초이며, 가상 시간은 지연 인수인 1000 밀리초와 동일합니다.
가상 시간의 "가상" 지연을 자식 코루틴에서 완전한 효과를 얻으려면 모든 자식 코루틴을 TestDispatcher로 시작하십시오. 그렇지 않으면 작동하지 않습니다. 이 디스패처는 다른 TestScope에서 상속되며 다른 디스패처를 지정하지 않는 한 자동으로 상속됩니다.
@Test
fun testDelayInLaunch() = runTest {
val realStartTime = System.currentTimeMillis()
val virtualStartTime = currentTime
bar()
println("${System.currentTimeMillis() - realStartTime} ms") // ~ 11 ms
println("${currentTime - virtualStartTime} ms") // 1000 ms
}
suspend fun bar() = coroutineScope {
launch {
delay(1000) // 실제 지연 없이 자동으로 진행됨
println("bar") // bar()가 호출될 때 eager하게 실행됨
}
}
위의 예제에서 launch가 Dispatchers.Default의 컨텍스트로 호출되면 테스트가 실패합니다. 작업이 아직 완료되지 않았다는 예외가 발생합니다.
함수를 정의할 때가 아니라 호출할 때 디스패처와 같은 컨텍스트 요소를 지정할 수 있으므로 더 유연하게 테스트할 수 있습니다.
가상 시간을 지원하는 테스트 API는 실험적이며 나중에 변경될 수 있습니다.
기본적으로 실험적인 테스트 API를 사용하면 컴파일러에서 경고를 표시합니다. 이러한 경고를 억제하려면 테스트 함수나 테스트를 포함하는 클래스에 @OptIn(ExperimentalCoroutinesApi::class)를 추가하십시오. 컴파일러가 실험적 API를 사용하고 있음을 지시하는 컴파일러 인수를 추가하십시오.
compileTestKotlin {
kotlinOptions {
freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
}
}
이 자습서에 해당하는 프로젝트에는 Gradle 스크립트에 이미 컴파일러 인수가 추가되어 있습니다.
작업 8
다음 테스트를 실제 시간 대신 가상 시간을 사용하도록 리팩터링하십시오:
- Request4SuspendKtTest.kt
- Request5ConcurrentKtTest.kt
- Request6ProgressKtTest.kt
- Request7ChannelsKtTest.kt
리팩터링을 적용하기 전후에 총 실행 시간을 비교하십시오.
작업 8을 위한 팁
1. runBlocking 호출을 runTest로 바꾸고 System.currentTimeMillis()를 currentTime으로 바꾸십시오.
@Test
fun test() = runTest {
val startTime = currentTime
// 작업 수행
val totalTime = currentTime - startTime
// 결과 확인
}
2. 정확한 가상 시간을 확인하는 단언을 주석 처리 해제하십시오.
3. @UseExperimental(ExperimentalCoroutinesApi::class)를 추가하는 것을 잊지 마십시오.
작업 8의 솔루션
다음은 동시 및 채널 케이스에 대한 솔루션입니다:
fun testConcurrent() = runTest {
val startTime = currentTime
val result = loadContributorsConcurrent(MockGithubService, testRequestData)
Assert.assertEquals("loadContributorsConcurrent에 대한 잘못된 결과", expectedConcurrentResults.users, result)
val totalTime = currentTime - startTime
Assert.assertEquals(
"호출이 동시에 실행되므로 총 가상 시간은 2200 ms여야 합니다: " +
"리포 요청에 대한 1000 및 동시 기여자 요청에 대한 max(1000, 1200, 800) = 1200",
expectedConcurrentResults.timeFromStart, totalTime
)
}
먼저 결과가 예상 가상 시간에 정확하게 사용 가능한지 확인한 다음 결과 자체를 확인하십시오.
fun testChannels() = runTest {
val startTime = currentTime
var index = 0
loadContributorsChannels(MockGithubService, testRequestData) { users, _ ->
val expected = concurrentProgressResults[index++]
val time = currentTime - startTime
Assert.assertEquals(
"${expected.timeFromStart} ms 후에 중간 결과가 예상됩니다:",
expected.timeFromStart, time
)
Assert.assertEquals("$time 후에 잘못된 중간 결과:", expected.users, users)
}
}
채널을 사용한 최종 버전에서 첫 번째 중간 결과는 진행 버전보다 더 빨리 사용 가능하며, 가상 시간을 사용하는 테스트에서 그 차이를 확인할 수 있습니다.
남은 "suspend" 및 "progress" 작업에 대한 테스트는 매우 유사하며 프로젝트의 솔루션 브랜치에서 찾을 수 있습니다.
원문
'Kotlin' 카테고리의 다른 글
[Kotlin] Kotlin 공식 문서 번역 - 코루틴 중단 가능한 함수 조합 (Composing suspending functions) (0) | 2023.10.01 |
---|---|
[Kotlin] Kotlin 공식 문서 번역 - 코루틴 취소와 타임아웃 (Coroutines Cancellation and timeouts) (0) | 2023.10.01 |
[Kotlin] Kotlin 공식 문서 번역 - 코루틴 기초 (Coroutines basics) (73) | 2023.10.01 |
[Kotlin] Kotlin 공식 문서 번역 - 코루틴 가이드 (Coroutines guide) (1) | 2023.10.01 |
[Kotlin] Kotlin 공식 문서 번역 - 시퀀스 (Sequence) (73) | 2023.09.26 |
댓글