10.10 【并发编程】深入异步IO框架:asyncio 中篇 ============================================== 今天的内容其实还挺多的,我准备了三天,到今天才整理完毕。希望大家看完,有所收获的,能给小明一个赞。这就是对小明最大的鼓励了。 为了更好地衔接这一节,我们先来回顾一下上一节的内容。 上一节,我们首先介绍了,如何创建一个协程对象. 主要有两种方法 - 通过\ ``async``\ 关键字, - 通过\ ``@asyncio.coroutine`` 装饰函数。 然后有了协程对象,就需要一个事件循环容器来运行我们的协程。其主要的步骤有如下几点: - 将协程对象转为task任务对象 - 定义一个事件循环对象容器用来存放task - 将task任务扔进事件循环对象中并触发 为了让大家,对生成器和协程有一个更加清晰的认识,我还介绍了\ ``yield``\ 和\ ``async/await``\ 的区别。 最后,我们还讲了,如何给一个协程添加回调函数。 好了,用个形象的比喻,上一节,其实就只是讲了协程中的\ ``单任务``\ 。哈哈,是不是还挺难的?希望大家一定要多看几遍,多敲代码,不要光看。 那么这一节,我们就来看下,协程中的\ ``多任务``\ 。 1. 协程中的并发 --------------- 协程的并发,和线程一样。举个例子来说,就好像 一个人同时吃三个馒头,咬了第一个馒头一口,就得等这口咽下去,才能去啃第其他两个馒头。就这样交替换着吃。 ``asyncio``\ 实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。 第一步,当然是创建多个协程的列表。 .. code:: python # 协程函数 async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) # 协程对象 coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) # 将协程转成task,并组成list tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] 第二步,如何将这些协程注册到事件循环中呢。 有两种方法,至于这两种方法什么区别,稍后会介绍。 - 使用\ ``asyncio.wait()`` .. code:: python loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) - 使用\ ``asyncio.gather()`` .. code:: python # 千万注意,这里的 「*」 不能省略 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) 最后,return的结果,可以用\ ``task.result()``\ 查看。 .. code:: python for task in tasks: print('Task ret: ', task.result()) 完整代码如下 .. code:: python import asyncio # 协程函数 async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) # 协程对象 coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) # 将协程转成task,并组成list tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task ret: ', task.result()) 输出结果 .. code:: python Waiting: 1 Waiting: 2 Waiting: 4 Task ret: Done after 1s Task ret: Done after 2s Task ret: Done after 4s 2. 协程中的嵌套 --------------- 使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。 来看个例子。 .. code:: python import asyncio # 用于内部的协程函数 async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) # 外部的协程函数 async def main(): # 创建三个协程对象 coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) # 将协程转为task,并组成list tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] # 【重点】:await 一个task列表(协程) # dones:表示已经完成的任务 # pendings:表示未完成的任务 dones, pendings = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) 如果这边,使用的是\ ``asyncio.gather()``\ ,是这么用的 .. code:: python # 注意这边返回结果,与await不一样 results = await asyncio.gather(*tasks) for result in results: print('Task ret: ', result) 输出还是一样的。 :: Waiting: 1 Waiting: 2 Waiting: 4 Task ret: Done after 1s Task ret: Done after 2s Task ret: Done after 4s 仔细查看,可以发现这个例子完全是由 上面「\ ``协程中的并发``\ 」例子改编而来。结果完全一样。只是把创建协程对象,转换task任务,封装成在一个协程函数里而已。外部的协程,嵌套了一个内部的协程。 其实你如果去看下\ ``asyncio.await()``\ 的源码的话,你会发现下面这种写法 .. code:: python loop.run_until_complete(asyncio.wait(tasks)) 看似没有嵌套,实际上内部也是嵌套的。 这里也把源码,贴出来,有兴趣可以看下,没兴趣,可以直接跳过。 .. code:: python # 内部协程函数 async def _wait(fs, timeout, return_when, loop): assert fs, 'Set of Futures is empty.' waiter = loop.create_future() timeout_handle = None if timeout is not None: timeout_handle = loop.call_later(timeout, _release_waiter, waiter) counter = len(fs) def _on_completion(f): nonlocal counter counter -= 1 if (counter <= 0 or return_when == FIRST_COMPLETED or return_when == FIRST_EXCEPTION and (not f.cancelled() and f.exception() is not None)): if timeout_handle is not None: timeout_handle.cancel() if not waiter.done(): waiter.set_result(None) for f in fs: f.add_done_callback(_on_completion) try: await waiter finally: if timeout_handle is not None: timeout_handle.cancel() done, pending = set(), set() for f in fs: f.remove_done_callback(_on_completion) if f.done(): done.add(f) else: pending.add(f) return done, pending # 外部协程函数 async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): if futures.isfuture(fs) or coroutines.iscoroutine(fs): raise TypeError(f"expect a list of futures, not {type(fs).__name__}") if not fs: raise ValueError('Set of coroutines/Futures is empty.') if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): raise ValueError(f'Invalid return_when value: {return_when}') if loop is None: loop = events.get_event_loop() fs = {ensure_future(f, loop=loop) for f in set(fs)} # 【重点】:await一个内部协程 return await _wait(fs, timeout, return_when, loop) 3. 协程中的状态 --------------- 还记得我们在讲生成器的时候,有提及过生成器的状态。同样,在协程这里,我们也了解一下协程(准确的说,应该是Future对象,或者Task任务)有哪些状态。 ``Pending``\ :创建future,还未执行 ``Running``\ :事件循环正在调用执行任务 ``Done``\ :任务执行完毕 ``Cancelled``\ :Task被取消后的状态 可手工 ``python3 xx.py`` 执行这段代码, .. code:: python import asyncio import threading import time async def hello(): print("Running in the loop...") flag = 0 while flag < 1000: with open("F:\\test.txt", "a") as f: f.write("------") flag += 1 print("Stop the loop") if __name__ == '__main__': coroutine = hello() loop = asyncio.get_event_loop() task = loop.create_task(coroutine) # Pending:未执行状态 print(task) try: t1 = threading.Thread(target=loop.run_until_complete, args=(task,)) # t1.daemon = True t1.start() # Running:运行中状态 time.sleep(1) print(task) t1.join() except KeyboardInterrupt as e: # 取消任务 task.cancel() # Cacelled:取消任务 print(task) finally: print(task) 顺利执行的话,将会打印 ``Pending`` -> ``Pending:Runing`` -> ``Finished`` 的状态变化 假如,执行后 立马按下 Ctrl+C,则会触发task取消,就会打印 ``Pending`` -> ``Cancelling`` -> ``Cancelling`` 的状态变化。 4. gather与wait --------------- 还记得上面我说,把多个协程注册进一个事件循环中有两种方法吗? - 使用\ ``asyncio.wait()`` .. code:: python loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) - 使用\ ``asyncio.gather()`` .. code:: python # 千万注意,这里的 「*」 不能省略 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) ``asyncio.gather`` 和 ``asyncio.wait`` 在asyncio中用得的比较广泛,这里有必要好好研究下这两货。 还是照例用例子来说明,先定义一个协程函数 .. code:: python import asyncio async def factorial(name, number): f = 1 for i in range(2, number+1): print("Task %s: Compute factorial(%s)..." % (name, i)) await asyncio.sleep(1) f *= i print("Task %s: factorial(%s) = %s" % (name, number, f)) 5. 接收参数方式 --------------- asyncio.wait ~~~~~~~~~~~~ 接收的tasks,必须是一个list对象,这个list对象里,存放多个的task。 它可以这样,用\ ``asyncio.ensure_future``\ 转为task对象 .. code:: python tasks=[ asyncio.ensure_future(factorial("A", 2)), asyncio.ensure_future(factorial("B", 3)), asyncio.ensure_future(factorial("C", 4)) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) 也可以这样,不转为task对象。 .. code:: python loop = asyncio.get_event_loop() tasks=[ factorial("A", 2), factorial("B", 3), factorial("C", 4) ] loop.run_until_complete(asyncio.wait(tasks)) asyncio.gather ~~~~~~~~~~~~~~ 接收的就比较广泛了,他可以接收list对象,但是 ``*`` 不能省略 .. code:: python tasks=[ asyncio.ensure_future(factorial("A", 2)), asyncio.ensure_future(factorial("B", 3)), asyncio.ensure_future(factorial("C", 4)) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) 还可以这样,和上面的 ``*`` 作用一致,这是因为\ ``asyncio.gather()``\ 的第一个参数是 ``*coros_or_futures``\ ,它叫 ``非命名键值可变长参数列表``\ ,可以集合所有没有命名的变量。 .. code:: python loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), )) 甚至还可以这样 .. code:: python loop = asyncio.get_event_loop() group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)]) group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)]) group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)]) loop.run_until_complete(asyncio.gather(group1, group2, group3)) 6. 返回结果不同 --------------- .. _asyncio.wait-1: asyncio.wait ~~~~~~~~~~~~ ``asyncio.wait`` 返回\ ``dones``\ 和\ ``pendings`` - ``dones``\ :表示已经完成的任务 - ``pendings``\ :表示未完成的任务 如果我们需要获取,运行结果,需要手工去收集获取。 .. code:: python dones, pendings = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result()) .. _asyncio.gather-1: asyncio.gather ~~~~~~~~~~~~~~ ``asyncio.gather`` 它会把值直接返回给我们,不需要手工去收集。 .. code:: python results = await asyncio.gather(*tasks) for result in results: print('Task ret: ', result) 7. wait有控制功能 ----------------- .. code:: python import asyncio import random async def coro(tag): await asyncio.sleep(random.uniform(0.5, 5)) loop = asyncio.get_event_loop() tasks = [coro(i) for i in range(1, 11)] # 【控制运行任务数】:运行第一个任务就返回 # FIRST_COMPLETED :第一个任务完全返回 # FIRST_EXCEPTION:产生第一个异常返回 # ALL_COMPLETED:所有任务完成返回 (默认选项) dones, pendings = loop.run_until_complete( asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) print("第一次完成的任务数:", len(dones)) # 【控制时间】:运行一秒后,就返回 dones2, pendings2 = loop.run_until_complete( asyncio.wait(pendings, timeout=1)) print("第二次完成的任务数:", len(dones2)) # 【默认】:所有任务完成后返回 dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2)) print("第三次完成的任务数:", len(dones3)) loop.close() 输出结果 .. code:: python 第一次完成的任务数: 1 第二次完成的任务数: 4 第三次完成的任务数: 5