Kotlin协程入门指南(译文)
原文链接:https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md
这是一份kotlinx.coroutines
核心特性的指南,它包含许多有趣的例子。
引言
Kotlin 作为一种语言在标准库中提供了最低限度的底层 API 以支持其它库使用协程. 不同于其它一些类似功能的语言,async
和 wait
并不是Kotlin中的关键字,甚至不是标准库的一部分。kotlinx.coroutines
是一个包含了本指南所涵盖的许多高级别的协程基础元素的丰富库,包括launch
, async
和其他一些元素,使用协程需要你添加一个依赖 kotlinx-coroutines-core
模块在你的项目中。
目录内容
协程基础
本节将介绍协程的基本概念
你的第一个协程
运行下面的代码:
fun main(args: Array<String>) {
launch { // launch new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
你可以从 这里 得到完整的代码
运行这段代码:
Hello,
World!
本质上协程是轻量级的线程,他们被协程launch构造器启动,你可以用 thread { ... }
替代 launch { ... }
和用 Thread.sleep(...)
替代 delay(...)
实现相同的结果,试试看。
如果你开始用 thread
替代 launch
编译器会产生下面的错误:
Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
这是因为 delay 是一种特殊的 suspending function(挂起函数),它不会阻塞线程但是会挂起协程,而且它只能用于协程。
桥接阻塞和非阻塞的世界
上面的第一个例子在同一份代码中混合了非阻塞的 delay(...)
和 阻塞的 Thread.sleep(...)
非常容易混淆哪一个是阻塞的哪一个是非阻塞的,我们使用 runBlocking 协程构造器来明确阻塞:
fun main(args: Array<String>) {
launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main thread continues here immediately
runBlocking { // but this expression blocks the main thread
delay(2000L) // ... while we delay for 2 seconds to keep JVM alive
}
}
你可以从 这里 得到完整的代码
结果是相同的,但是这段代码只使用了非阻塞的 delay.调用 runBlocking
的主线程会一直阻塞直到在 runBlocking
里面的协程运行完成。
这个例子也可以用一种更地道的方式重写,用runBlocking
包装 main 函数的执行:
fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}
你可以从 这里 得到完整的代码
这里 runBlocking<Unit> { ... }
作为一个适配器用于启动顶层的 main 协程.我们显示地指定了它的 Unit
返回类型,因为在Kotlin中一个格式良好的 main
函数必须返回 Unit
。
这也是为挂起函数编写单元测试的一种方式:
class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking<Unit> {
// here we can use suspending functions using any assertion style that we like
}
}
等待一个工作
当另一个协程正在工作时去延迟一段时间不是一个好的方法,让我们明确地以一种非阻塞的方式等待直到我们启动的后台任务完成:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { // launch new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // wait until child coroutine completes
}
你可以从 这里
现在的结果仍然是一样的,但是 main 协程的代码并没有以任何方式绑定到后台工作的过程中,好多了。
函数提取重构
我们提取 launch { ... }
里面的代码块到一个独立的函数中. 在这段代码上执行“Extract function”提取重构,您将得到一个具有 suspend
修饰符的新函数。这是你的第一个 挂起 函数,挂起函数可以像普通函数一样在协程中使用,但是他们额外的功能是可以使用其它的挂起函数,比如在这个例子中的 delay
函数,来挂起协程的执行:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { doWorld() }
println("Hello,")
job.join()
}
// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}
你可以从 这里 得到完整的代码
协程是轻量级的
运行下面的代码:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) { // launch a lot of coroutines and list their jobs
launch {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // wait for all jobs to complete
}
你可以从 这里 得到完整的代码
这段程序启动了10万个协程,一秒后每一个协程打印了一个 “.” ,现在用线程来试一试,会发生什么?(很可能你的代码会产生一些内存溢出的错误)
协程就像守护线程
下面的代码启动了一个每隔500ms打印一次 “I’m sleeping”的长时间运行的协程, 并延迟一段时间后从 main 函数中返回
fun main(args: Array<String>) = runBlocking<Unit> {
launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // just quit after delay
}
你可以从 这里 得到完整的代码
您可以运行并看到它打印三行后终止了:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
活动的协程并不会保证当前的进程的存活,协程就像守护线程。
取消和超时
本节将讨论协程的取消和超时.
取消协程的执行
在小的应用中 “main” 方法的返回可能像是一个使所有协程隐式地终止的好主意,在一个更大的长时间运行的应用中,你需要更细粒度的控制。launch 函数返回的Job 可以用来取消协程的运行:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
你可以从 这里 得到完整的代码
它会产生下面的输出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
一旦 main 函数调用了 job.cancel
,我们没有看到其它协程的输出因为它被取消了, 还有一个 Job 扩展函数 cancelAndJoin,它组合了 [cancel] 和 [join] 调用。
取消是合作的
协程取消是合作的,协程代码必须合作才能被取消,所有的在 kotlinx.coroutines
中的挂起函数都是可取消的,他们检查协程是否被取消了,如果已经取消则会抛出 CancellationException 异常,然而一个正工作在计算线程中且没有检查取消的协程就不能被取消,就像下面例子这样:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
你可以从 这里 得到完整的代码
运行后可以看到它将继续打印 “I’m sleeping” 即使在协程取消后直到完成5次迭代任务完成。
使计算代码可取消的
有两种方式可以使计算代码可以被取消,第一个是周期性调用一个挂起函数检查是否取消,有一个 yield 函数是一个达到此目的的一个很好的选择,另一个是显示地检查取消状态,让我们试试后面一个的方法,将 while (i < 5)
替换为 while (isActive)
并重新运行它:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
你可以从 这里 得到完整的代码
如你所见,现在这个循环被取消了。isActive 是一个表示当前协程是否可用的 CoroutineScope 对象的属性。
在最后关闭资源
可以用一切正常的方式来处理可取消的挂起函数上抛出的CancellationException异常 例如, 使用try {...} finally {...}
表达式在协程被取消时正常地执行 finally{} 中的kotlin操作 :
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
你可以从 这里 得到完整的代码
join 和 cancelAndJoin] 都等待所有最终完成的操作,因此上面的示例产生以下输出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.
运行不可取消的代码块
在前面例子中的finally
代码块中去使用挂起函数的任何尝试都将导致CancellationException异常,因为运行这段代码的这个协程已经被取消了,通常这不是一个问题,因为所有良好行为的关闭操作(关闭文件、取消作业或者关闭任何类型的通信通道)通常都是非阻塞的,并且不会调起任何挂起函数,然而在极少数情况下,当你需要在被取消的协程中挂起时,你可以使用 withContext 函数和 NonCancellable 上下文将相应的代码包装在 withContext(NonCancellable) {...}
中,如下例所示:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
你可以从 这里 得到完整的代码
超时
在实践中取消协程执行最明显的原因是它的执行时间超过一定的时间,虽然你可以手动跟踪相应Job的引用并启动一个单独的协程以便在延迟之后取消对应的Job,但这里有一个准备好的 withTimeout 函数可以轻松做这件事。
看下面的例子:
fun main(args: Array<String>) = runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
你可以从 这里 得到完整的代码
它产生下面的输出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
withTimeout抛出的 TimeoutCancellationException
异常是 CancellationException异常的子类。我们还没有看到过控制台打印的堆栈跟踪,这是因为在一个取消了的协程内CancellationException
被认为是协程完成的一个正常的原因。但是,在这个例子中我们已经在 main
函数中使用了 withTimeout
。
因为取消只是一个例外,所有的资源都将以通常的方式关闭。
你可以在 try {...} catch (e: TimeoutCancellationException) {...}
代码块中 用 timeout
来包装代码,或者使用 withTimeoutOrNull 函数,如果你需要对任何类型的超时去做一些额外的操作, withTimeoutOrNull 函数类似于 withTimeout,但是在超时时返回 null
而不是异常
fun main(args: Array<String>) = runBlocking<Unit> {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}
你可以从 这里 得到完整的代码
运行此代码时不再有异常:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
组合挂起函数
本节介绍各种不同挂起函数的组合方法
默认顺序
假设我们在其它地方定义了两个挂起函数,它们可以做一些有用的事情,比如某种远程调用或计算。我们假装它们是有用的,但实际上每一个都只是为了这个例子而延迟一秒钟:
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
如果需要去顺序调用它们我们该怎么做 – 首先 doSomethingUsefulOne
然后 doSomethingUsefulTwo
并计算它们结果的和?
在实践中我们会使用第一个函数的结果去决定是否我们需要去调用第二个函数或者怎样去调用它,我们只需要使用一个正常的顺序调用,因为协程中的代码和普通代码样默认是顺序执行的,下面的示例通过测量执行两个挂起函数所需要的总时间来演示它:
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}
你可以从 这里 得到完整的代码
这段代码会输出这些:
The answer is 42
Completed in 2017 ms
使用异步并发
如果我们想要更快的结果,要求 doSomethingUsefulOne
和 doSomethingUsefulTwo
之间没有依赖,它们是并发执行的,该怎么做?async 可以帮助我们来完成。
从概念上来讲,async 就像 launch。它启动了一个独立的协程,这个协程是一个轻量级的线程,其它所有的协程并发工作。 区别在于 launch
返回一个不带任何结果值的 Job,而 async
返回一个 Deferred – 一个轻量非阻塞的未来,代表了以后提供结果的承诺。你可以在一个 deferred 值上使用 .await()
来得到它最终的结果,Deferred
也是一个 Job
, 所以你也能在有需要时取消它
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
你可以从 这里 得到完整的代码
这段代码会输出这些:
The answer is 42
Completed in 1017 ms
这次快了两倍,因为我们并发执行了两个协程。注意,协程的并发始终是显式的。
懒异步启动
对于 async 有一个 “懒启动” 配置项,在参数 start
中使用 CoroutineStart.LAZY 值。仅仅当它的结果被需要时才会启动执行协程,如调用了 await 或者 start 函数。
运行下面这个示例,与上一个示例不同,这里使用了 “懒启动” 选项
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
你可以从 这里 得到完整的代码
这段代码会输出这些:
The answer is 42
Completed in 2017 ms
因此我们又回到了顺序执行,因为我们 首先启动并等待 one
然后启动并等待 two
。这不是一个想要的懒用例。懒异步被设计用于当计算值涉及到挂起函数时取代标准的 lazy
函数
异步风格函数
我们可以定义使用协程异步构造器的异步风格函数异步地调用 doSomethingUsefulOne
和 doSomethingUsefulTwo
。用 “Async” 后缀来命名这样的函数是一种很好的方式,它强调了这样一个事实:它们只启动异步计算,并且需要使用产生的延迟值来获得结果。
// The result type of somethingUsefulOneAsync is Deferred<Int>
fun somethingUsefulOneAsync() = async {
doSomethingUsefulOne()
}
// The result type of somethingUsefulTwoAsync is Deferred<Int>
fun somethingUsefulTwoAsync() = async {
doSomethingUsefulTwo()
}
注意,这些 xxxAsync
函数不是挂起函数他们可以在任意地方被使用。然而,他们的使用经常暗示着他们的操作调用代码的执行是异步的
下面的例子展示了他们在协程之外的用法:
// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
你可以从 这里 得到完整的代码
协程上下文和调度器
协程总是执行在某些上下文中,这些上下文由 Kotlin 标准库中定义的 [CoroutineContext] 类型的值来表示的。协程上下文是一个不同元素的集合,主要的元素是协程中的Job,我们之前看到过,本节我们将会介绍它的调度器。
调度器和线程
协程上下文包含协程调度器,协程调度器决定协程执行时的线程或者相应协程使用的线程。协程调度器可以将协程的执行限制为一个特定的线程,将其分派给线程池,或者让它不受限制地运行。
所有的协程构造器如 launch 和 async ,接收一个可选项 CoroutineContext ,这个可选项用来显示地指定新协程和它的其它上下文元素的调度器。
试一试下面的例子:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
你可以从 这里 得到完整的代码
它产生下面的输出(可能是不同的顺序)
'Unconfined': I'm working in thread main
'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main
我们在前面部分中使用的默认调度器是由 DefaultDispatcher表示的,它在当前实现中等于CommonPool,所以 launch { ... }
、launch(DefaultDispatcher) { ... }
、launch(CommonPool) { ... }
使用的是相同的调度器,父类[coroutineContext]和 Unconfined不限制上下文之间的不同将会在稍后介绍。注意,newSingleThreadContext 创建了一个新的线程,这是一个非常昂贵的资源。
在实际应用程序中必须在不在需要的时候,使用 close 函数释放它,或者将其存储在顶层变量中,并在整个应用程序中重用。
受限和不受限调度器的区别
无限制协程分发器在调用者的线程中启动协程,但直到第一个挂起点,在挂起后它在线程中的恢复完全由被调用的挂起函数决定, 当协程不消耗CPU事件,也不更新限制在特定线程的共享数据(如UI)时,不受限调度器是合适的。
在另一方面,coroutineContext 属性可以通过 CoroutineScope 接口在任何协程代码块中使用,这是对这个特定的协程上下文的引用。
这种方式,父类上下文可以被继承,特别是 runBlocking 协程的默认调度器只受限于调用的线程,因此继承它具有将执行限制到该线程的效果,并具有可预测的FIFO调度。
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
delay(500)
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
你可以从 这里 得到完整的代码
产生的输出:
'Unconfined': I'm working in thread main
'coroutineContext': I'm working in thread main
'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'coroutineContext': After delay in thread main
因此继承了 runBlocking {...}
中 coroutineContext
的协程继续在 main
线程中执行, 而不受限的协程在延迟delay函数正在使用的默认执行线程中恢复执行。
调试协程和线程
协程可以在一个线程上挂起然后在另一个使用不受限调度器或者一个默认多线程调度器的线程上恢复。即使使用单线程调度器也很难弄清楚 协程 在做什么、在哪里、何时在做。调试使用线程的应用程序的常见方法是在每个日志语句的日志文件中打印线程名,这个特性通常在日志框架中会被支持,当使用协程,单独的线程名并不能提供很多上下文信息,所以 kotlinx.coroutines
中包含调试工具使它更容易。
使用 -Dkotlinx.coroutines.debug
JVM 选项运行下面的代码
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking<Unit> {
val a = async(coroutineContext) {
log("I'm computing a piece of the answer")
6
}
val b = async(coroutineContext) {
log("I'm computing another piece of the answer")
7
}
log("The answer is ${a.await() * b.await()}")
}
你可以从 这里 得到完整的代码
这儿有三个协程,主协程 runBlocking
,两个协程计算延迟值 a
和 b
。他们都执行在 runBlocking
的上下文中并受限在 main
线程 这段代码的输出:
[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42
可以看到日志函数在方括号里面打印了线程的名称,而且当前执行的协程的标识符也添加在其后面。当调试模式打开时,这个标识符会被连续地分配给所有创建的协程。
您可以在newCoroutineContext函数的文档中了解更多关于调试工具的内容。
线程间跳转
使用 -Dkotlinx.coroutines.debug
JVM可选项运行下面的代码
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
你可以从 这里 得到完整的代码
它演示了几种新技术。一个是使用带有显示指定上下文的runBlocking,另一个是使用 withContext 函数来更改协程的上下文,同时仍然保持运行在相同的协程中,正如你再下面的输出中看到的那样。
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1
注意,这个示例还使用了Kotlin标准库中的 use
函数,以释放在不再需要时使用newSingleThreadContext 创建的线程。
在上下文中工作
协程的 Job 是其上下文 context 的一部分,协程可以使用 coroutineContext[Job]
表达式从自己的上下文context中检索它:
fun main(args: Array<String>) = runBlocking<Unit> {
println("My job is ${coroutineContext[Job]}")
}
你可以从 这里 得到完整的代码
在调试模式下它会产生像下面这样的代码:
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
所以 在 CoroutineScope.isActive 中的 isActive 只是 coroutineContext[Job]!!.isActive
的快捷方式
协程的孩子
当协程的 coroutineContext 被用来启动另一个协程时,新协程的 Job 就变成了父协程 Job 的孩子,当父协程取消后所有它的孩子都会被递归的取消。
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
// it spawns two other jobs, one with its separate context
val job1 = launch {
println("job1: I have my own context and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// and the other inherits the parent context
val job2 = launch(coroutineContext) {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
// request completes when both its sub-jobs complete:
job1.join()
job2.join()
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
你可以从 这里 得到完整的代码
输出如下:
job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?
组合上下文
协程的上下文可以使用 +
运算符组合。右边的上下文替换了左边上下文的相关条目,例如 父类协程的 Job 可以被继承,而它的调度器被替换:
fun main(args: Array<String>) = runBlocking<Unit> {
// start a coroutine to process some kind of incoming request
val request = launch(coroutineContext) { // use the context of `runBlocking`
// spawns CPU-intensive child job in CommonPool !!!
val job = launch(coroutineContext + CommonPool) {
println("job: I am a child of the request coroutine, but with a different dispatcher")
delay(1000)
println("job: I will not execute this line if my parent request is cancelled")
}
job.join() // request completes when its sub-job completes
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
你可以从 这里 得到完整的代码
该代码的预期结果是:
job: I am a child of the request coroutine, but with a different dispatcher
main: Who has survived request cancellation?
父类的责任
父类总是等待所有它的孩子的完成。父类不必显示地跟踪它启动的所有孩子而且也不必在最后使用 Job.join 来等待他们:
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
repeat(3) { i -> // launch a few children jobs
launch(coroutineContext) {
delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // wait for completion of the request, including all its children
println("Now processing of the request is complete")
}
你可以从 这里 得到完整的代码
结果将是:
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
为调试命名协程
当协程日志频繁时自动分配的id很好,你只需要将来自相同协程的日志记录关联起来。然而当协程被绑定到特定请求的处理或做一些特定的后台任务时,最好将其显示地命名用于调试目的。CoroutineName 上下文元素的功能与线程名称相同。当调试模式打开时正在执行的这个协程会显示在线程名称中。
下面的例子演示了这个概念
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
log("Started main coroutine")
// run two background value computations
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1")
252
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("Computing v2")
6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}
你可以从 这里 得到完整的代码
类似地配置 JVM 选项 -Dkotlinx.coroutines.debug
将会产生下面输出
[main @main#1] Started main coroutine
[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42
工作的显式取消
让我们把对协程的context
、children
、和job
的认知放在一起。假设我们的应用程序有一个具有生命周期的对象,但这个对象不是一个协程。比如,我们正在编写一个 Android 应用程序并在 Android 的 Activity 的上下文中启动了多种协程去执行获取、更新数据、动画等的异步操作。为了避免一些内存泄漏所有的这些协程必须在 Activity 销毁的时候取消。
我们可以通过创建一个绑定到我们 Activity 生命周期的 Job 实例对象来管理协程的生命周期。如下面例子展示的,为了方便起见,使用 Job() 工厂函数来创建一个 Job 实例对象而不是使用 launch(coroutineContext + job)
表达式创建。我们可以编写 launch(coroutineContext, parent = job)
来明确表示正在使用父类 Job 的事实。
现在,一个单独的 Job.cancel 的调用就可以取消所有我们启动的孩子 Job。此外 Job.join 等待所有的孩子工作的完成,所以我们可以在这个例子中使用 cancelAndJoin:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = Job() // create a job object to manage our lifecycle
// now launch ten coroutines for a demo, each working for a different time
val coroutines = List(10) { i ->
// they are all children of our job object
launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
}
println("Launched ${coroutines.size} coroutines")
delay(500L) // delay for half a second
println("Cancelling the job!")
job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
}
你可以从 [这里] 得到完整的代码(https://github.com/ethanhua/kotlinx.coroutines/blob/master/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-10.kt)
这个例子的输出:
Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!
正如你看到的,只是前三个协程打印了信息,而其他的协程都被一个 job.cancelAndJoin()
调用取消了。因此在我们假设的 Android 应用程序中,我们需要做的就是在创建 Activity 时创建一个父类 Job 并将其应用于子 Job ,并在 Activity 销毁的时候取消它,我们不能在 Android 生命周期中 join
他们。因为他们是同步的。但是可以在构建后台服务以确保有界的资源使用时这种加入连接能力是有用的。
通道
Deferred 延迟值提供一个方便的途径去传递协程间的单值。Channels 通道提供了一种传递值流的方法。
通道基础
[Channel] 在概念上非常类似于阻塞队列 BlockingQueue
。一个关键的区别是 它有一个挂起的[send]而不是一个阻塞的 put
操作,一个挂起的 receive 而不是一个阻塞的 take
操作:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
你可以从 这里 得到完整的代码
输出的代码:
1
4
9
16
25
Done!
通道关闭和迭代
和队列不同,通道可以被关闭意味着没有更多的元素可以进来。在接收端使用一个常规的 for
循环来接收从通道中来的元素是很方便的。概念上,close 像是发送一个特殊的关闭凭证到通道。一旦接收到了这个关闭凭证迭代就会停止。所以可以保证在 close 调用之前发送的元素都可以被接收到:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
你可以从 这里 得到完整的代码
构建通道生产者
这种协程产生序列元素的模式是很常见的。它是在并发代码中经常看到的生产者-消费者模式的一部分。你可以将这样的生产者抽象为以通道为参数的函数,但这违背了必须从函数返回结果的常识。这里有一个名为 produce 的协程构造器可以在生产者端很容易地做这件事,并且有一个扩展函数 consumeEach 取代消费者端的 for
循环。
fun produceSquares() = produce<Int> {
for (x in 1..5) send(x * x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
你可以从 这里 得到完整的代码
管道
管道是协程产生的一个可能无限的值流。
fun produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
另一个协程或一些协程正在消费这条值流,做一些处理并产生一些其它的结果。在下面的例子中,数字仅仅被平方:
fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
主函数启动并连接整个管道:
fun main(args: Array<String>) = runBlocking<Unit> {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
for (i in 1..5) println(squares.receive()) // print first five
println("Done!") // we are done
squares.cancel() // need to cancel these coroutines in a larger app
numbers.cancel()
}
你可以从 这里 得到完整的代码
在这个示例应用程序中我们不必取消这些协程,因为协程是守护线程,但是在大的应用程序中我们需要在不再需要的使用去停止管道。另外我们可以像下面的例子中演示的那样,将管道协程作为主协程的孩子运行。
素数与管道
让我们以管道为例来生成素数。我们以一个无穷数列开始,这次我们引入了一个显式的 context
参数,并将其传递给 produce 构建器,以便调用方可以控制我们协程运行的位置。
fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
下面的管道阶段过滤一个输入数字流,移除掉所有能被给定素数整除的数字:
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
for (x in numbers) if (x % prime != 0) send(x)
}
现在我们启动一个以数字2开始的数字流来构建我们的管道,从当前的通道中拿出一个素数,并为每一个发现的素数启动一个新的管道阶段。
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
下面的例子打印前十个素数,运行在主线程中的上下文中的整个管道,所有的协程作为主协程的孩子被启动,我们不必去维持一个显式的启动过的所有协程的列表,我们使用 cancelChildren 扩展函数去取消所有的子协程。
fun main(args: Array<String>) = runBlocking<Unit> {
var cur = numbersFrom(coroutineContext, 2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(coroutineContext, cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
你可以从 这里 得到完整的代码
输出代码:
2
3
5
7
11
13
17
19
23
29
注意,你可以用标准库中的buildIterator
构建器来构建相同的管道。用buildIterator
替换produce
,yield
替换send
,next
替换 receive
,Iterator
替换 ReceiveChannel
,并删除上下文。你也不会再需要runBlocking
。然而,使用上述通道的管道的好处是,如果在CommonPool上下文中运行,它实际上可以使用多个CPU内核。无论如何,这是一种非常不切实际的方法来找到素数。在实践中,管道涉及一些其他挂起调用(如异步调用远程服务),这些管道不能使用buildSeqeunce
/ buildIterator
,因为他们不允许任意挂起,与produce
不同,这是完全异步的。
扇出
多个协程可以从同一通道中接收数据并进行分配工作处理。让我们从一个定期产生数据(1秒产生10个数字)的生产者协程开始:
fun produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
接下来,我们可以有一些处理协程,在这个例子中,他们仅打印出他们接收到的数字:
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
channel.consumeEach {
println("Processor #$id received $it")
}
}
现在我们启动5个处理器并让他们工作接近1秒的时间,看看会发生什么
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
你可以从 这里 得到完整的代码
会产生类似于下面的输出,尽管接收每个特定整数的处理器id可能不同。
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
注意,取消生产者协程会关闭它的通道,因此,最终会终止处理器协程工作的通道上的迭代。
扇入
多个协程可以发送数据到相同的通道,例如,我们有一个字符串通道以及一个挂起函数,它会以指定的延迟发送指定的字符串到这个通道:
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
现在让我们来看一看会发生什么,如果我们启动了一对发送字符串的协程(在这个例子中我们启动他们在主线程的上下文中,作为主协程的孩子)
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo", 200L) }
launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
你可以从 这里 得到完整的代码
输出:
foo
foo
BAR!
foo
foo
BAR!
缓冲通道
到目前为止我们看到的都是没有缓冲区的通道。无缓冲的通道传递元素在发送者和接受者相遇时。如果先发射,那么它会挂起直到接收被调用,如果先接收,它会挂起直到发射被调用。 Channel() 工厂函数和 produce 构造器都使用可选项 capacity
参数指定缓存大小,缓存允许发射者发送多个元素在通道被挂起前,类似于有特定容量的BlockingQueue
,它会阻塞当缓存满的时候。
看一看下面代码的行为:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch(coroutineContext) { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}
你可以从 这里 得到完整的代码
使用一个有4个容量的缓冲通道将会打印 “sending” 5次:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
前4个元素被添加进缓存中并且当尝试去发送第5个元素时发送器将会挂起。
通道是公平的
向通道发送和接收操作是公平的,因为他们遵循协程们间的调用顺序。他们是先进先出的顺序,例如 第一个调用 receive
的协程第一个得到元素、在下面的例子中,两个协程 “ping” 和 “pong” 从共享的 “table” 通道接收 “ball” 对象。
data class Ball(var hits: Int)
fun main(args: Array<String>) = runBlocking<Unit> {
val table = Channel<Ball>() // a shared table
launch(coroutineContext) { player("ping", table) }
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
你可以从 这里 得到完整的代码
ping
协程首先启动,所以它第一个接收到球。即使 ping
协程立刻把球收到并发出去,但是球会被 pong
协程收到,因为它已经在等待了。
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
注意,有时候通道可能会产生看起来不公平的执行,是因为正在使用的执行器的本性,详情请看this issue
共享可变状态和并发
可以使用如 CommonPool 的多线程分派器并发执行协程。它呈现了所有常见的并发问题。主要问题是访问共享可变状态的同步。在这一问题的解决方案中,与多线程世界的解决方案相似,但其他的解决方案是独特的。
问题
我们启动一千个协程,每个协程都做同样的动作一千次(总共执行一百万次).我们测量它们的完成时间,以便进行进一步的比较:
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch(context) {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
我们使用多线程 CommonPool 上下文启动一个增加一个共享可变变量的简单动作。
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
你可以从 这里 得到完整的代码
最后会打印出什么?大概率不会打印出 “Counter = 1000000”,因为这一千个协程并发地增加 counter
,他们之间没有同步数据状态。
注意:如果您有一个具有2个或更少cpu的旧系统,那么您将始终看到1000000,因为在这个示例中,CommonPool只运行一个线程。为了重现这个问题,你需要做出以下改变:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
counter++
}
println("Counter = $counter")
}
你可以从 这里 得到完整的代码
Volatiles没有用
有一种常见的误解,就是使变量 volatile
解决了并发问题。让我们试一试:
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
你可以从 这里 得到完整的代码
这段代码运行很慢,且在最后我们仍然得不到 “Counter = 1000000”,因为 volatile 变量保证线性化(这是 “atomic” 原子的技术术语)读取和写入变量,但是不支持更大量动作的原子性(我们这个例子中的)
线程安全的数据结构
上诉问题的通用解决方案是使用线程安全的(即同步的、线性化的或原子的)数据结构,为需要在共享状态上执行的相应操作提供所有必要的同步。对于一个简单的计数器,我们可以使用 AtomicInteger
类,它具有原子性的incrementAndGet
操作:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
你可以从 这里 得到完整的代码
这是解决这个问题的最快方法。它适用于普通计数器、集合、队列和其他标准数据结构和基本操作。但是,它不容易扩展到复杂的状态或复杂的操作,这些操作没有现成的线程安全实现。
线程约束细粒度
线程约束是一个解决共享可变状态问题的方法,所有访问特定共享状态仅限于单个线程。它通常用于UI应用程序,所有UI状态被限制到一个单独的事件调度/应用程序线程,在协程中很容易应用,就是使用一个单独的线程上下文:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // run each coroutine in CommonPool
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}
你可以从 这里 得到完整的代码
这段代码运行很慢,因为它的执行受线程颗粒度限制。每次的增加需要从多线程的 CommonPool
切换到使用withContext块的单线程上下文中。
线程约束粗粒度
实际上,线程限制是在大的块中执行的,例如 状态更新业务逻辑的大片段只局限于单个线程。下面的例子是这样做的,在单线程的上下文中运行每个协程。
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(counterContext) { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
}
你可以从 这里 得到完整的代码
现在运行起来快多了且产生正确的结果
互斥
互斥解决方案可以保护所有对关键部分的共享状态的修改从不并发执行。在阻塞的世界你通常会使用 synchronized
或者 ReentrantLock
。协程中的替代叫做Mutex。它拥有 lock and unlock函数去限定关键的部分。关键的不同是 Mutex.lock
是一个挂起函数,它并不阻塞线程。还有一个 withLock 扩展函数方便的代表 mutex.lock(); try { ... } finally { mutex.unlock() }
模式:
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
你可以从 这里 得到完整的代码
这个例子的锁定是细粒度的,所以它代价昂贵,然而,在你必须需要定期修改一些共享状态但是没有这个状态被限制的原始线程时这个方案是一种很好的选择。
Actors行动者
actor 是协程的一种组合,一种被封装进协程的受限的状态,一个与其它协程交流的通道。一个简单的 actor 可以写成一个函数,但是一个具有复杂状态的 actor 更适合写成一个类。
有一个actor 协程构建器可以方便地地组合 actor 的邮箱通道以接收消息,并结合发送通道到一个结果工作对象中,这样就可以将单个引用作为它的句柄来执行。
使用actor的第一步是定义一个参与者将要处理的消息类。Kotlin的密封类 sealed classes 非常适合这个用途。我们用IncCounter
消息来定义CounterMsg
密封类,以增加计数器和GetCounter
消息以获取其值。稍后需要发送响应。一个 CompletableDeferred 通信原语表示一个将来会被知道的值,在这里被用于这个目的。
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
我们定义一个函数去使用 actor 协程构建器启动一个 actor:
// This function launches a new counter actor
fun counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
主要代码很简单:
fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor() // create the actor
massiveRun(CommonPool) {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
你可以从 这里 得到完整的代码
actor 本身被执行的上下文是什么不重要。一个 actor 是一个顺序执行的协程,所以将状态限制到特定的协程可以作为一个解决共享可变状态问题的解决方案。Actor 比锁在负载下更有效,因为在这种情况下 actor 总是在工作且不需要切换到不同的上下文。
注意,一个 actor 协程构建器是一个二元的 produce 协程构建器。一个 actor 与它接收消息的通道相关联,而一个 [producer]则与它发送元素的通道相关联。
选择表达式
选择表达式可以同时等待多个挂起函数,并且 select 第一个可用的。
从通道中选择
我们有2个分别产生字符串fizz
和 buzz
的生产者。fizz
生产者每300ms产生一个 “Fizz”:
fun fizz(context: CoroutineContext) = produce<String>(context) {
while (true) { // sends "Fizz" every 300 ms
delay(300)
send("Fizz")
}
}
buzz
每500ms产生一个 “Buzz!”:
fun buzz(context: CoroutineContext) = produce<String>(context) {
while (true) { // sends "Buzz!" every 500 ms
delay(500)
send("Buzz!")
}
}
使用 receive 挂起函数,我们可以从一个通道或另一个通道接收数据。但是 select 表达式允许我们同时使用它的 onReceive 子句来接收:
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // <Unit> means that this select expression does not produce any result
fizz.onReceive { value -> // this is the first select clause
println("fizz -> '$value'")
}
buzz.onReceive { value -> // this is the second select clause
println("buzz -> '$value'")
}
}
}
我们运行它七次:
fun main(args: Array<String>) = runBlocking<Unit> {
val fizz = fizz(coroutineContext)
val buzz = buzz(coroutineContext)
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}
你可以从 这里 得到完整的代码
结果如下:
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'
关闭选择
当通道关闭时,在 select
中 onReceive 子句失败,相应的 select
抛出异常。当通道关闭时,我们可以使用 onReceiveOrNull 子句执行特定的操作。下面的示例展示 select
是返回其所选子句的结果的表达式:
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}
我们使用通道a
产生 “Hello” 4次,通道b
产生 “World” 4次:
fun main(args: Array<String>) = runBlocking<Unit> {
// we are using the context of the main thread in this example for predictability ...
val a = produce<String>(coroutineContext) {
repeat(4) { send("Hello $it") }
}
val b = produce<String>(coroutineContext) {
repeat(4) { send("World $it") }
}
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
你可以从 这里 得到完整的代码
结果非常有趣,我们来详细分析一下:
a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed
有两个观察结果可以证明这一点
第一点,select
偏向于第一个子句,当多个子句同时可选时,第一个子句优先被选中。在这里,两个通道都在不断地产生字符串,所以“a”通道,作为select
的第一个子句会获胜。然而由于我们使用的是无缓冲通道,因此“a”会不时地被挂起在它的 send 调用上,并给 “b” 一个发送的机会。
第二点,当通道已经关闭时,onReceiveOrNull 会立即被选中。
选择发送
选择表达式有一个 onSend 子句可以很好的结合选择的偏好性质来使用,让我们来写一个整数生产者的例子,当消费者在其主频道上无法跟上它的时候,将其值发送到side
频道:
fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
for (num in 1..10) { // produce 10 numbers from 1 to 10
delay(100) // every 100 ms
select<Unit> {
onSend(num) {} // Send to the primary channel
side.onSend(num) {} // or to the side channel
}
}
}
消费者将会非常缓慢,每个数字需要250毫秒的时间来处理:
fun main(args: Array<String>) = runBlocking<Unit> {
val side = Channel<Int>() // allocate side channel
launch(coroutineContext) { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
}
produceNumbers(coroutineContext, side).consumeEach {
println("Consuming $it")
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
coroutineContext.cancelChildren()
}
你可以从 这里 得到完整的代码
我们来看看会发生什么:
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
选择延迟值
延迟值可以使用 onAwait 子句来被选择。我们启动一个在一个随机延迟后返回一个延迟的字符串值的异步函数
fun asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
}
让我们以随机的延迟来启动12个上面的异步函数
fun asyncStringsList(): List<Deferred<String>> {
val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}
现在,主函数等待第一个函数完成并计算仍然处于活动状态的递延值的数量。注意,我们在这里使用的select
表达式事实上是Kotlin DSL,因此我们可以使用任意代码为它提供子句。在本例中,我们迭代了一个递延值列表,为每个递延值提供 onwait 子句。
fun main(args: Array<String>) = runBlocking<Unit> {
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
}
你可以从 这里 得到完整的代码
输出是:
Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
切换延迟值的通道
让我们编写一个通道生成器函数,它使用延迟字符串值的通道,等待每个接收到的延迟值直到下一个递延值出现或通道关闭。这个示例将 onReceiveOrNull和 onAwait 子句放在同一个 select
中:
fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
val next = select<Deferred<String>?> { // return next deferred value from this select or null
input.onReceiveOrNull { update ->
update // replaces next value to wait
}
current.onAwait { value ->
send(value) // send value that current deferred has produced
input.receiveOrNull() // and use the next deferred from the input channel
}
}
if (next == null) {
println("Channel was closed")
break // out of loop
} else {
current = next
}
}
}
为了测试它,我们将使用一个简单的异步函数,它在指定的时间后返回指定的字符串:
fun asyncString(str: String, time: Long) = async {
delay(time)
str
}
主函数启动一个协程打印switchMapDeferreds
的结果,并将一些测试数据发送给它:
fun main(args: Array<String>) = runBlocking<Unit> {
val chan = Channel<Deferred<String>>() // the channel for test
launch(coroutineContext) { // launch printing coroutine
for (s in switchMapDeferreds(chan))
println(s) // print each received string
}
chan.send(asyncString("BEGIN", 100))
delay(200) // enough time for "BEGIN" to be produced
chan.send(asyncString("Slow", 500))
delay(100) // not enough time to produce slow
chan.send(asyncString("Replace", 100))
delay(500) // give it time before the last one
chan.send(asyncString("END", 500))
delay(1000) // give it time to process
chan.close() // close the channel ...
delay(500) // and wait some time to let it finish
}
你可以从 这里 得到完整的代码
代码运行结果:
BEGIN
Replace
END
Channel was closed