10.11 【并发编程】实战异步IO框架:asyncio 下篇 ============================================== 1. 动态添加协程 --------------- 在实战之前,我们要先了解下在asyncio中如何将协程态添加到事件循环中的。这是前提。 如何实现呢,有两种方法: - 主线程是同步的 .. code:: python import time import asyncio from queue import Queue from threading import Thread def start_loop(loop): # 一个在后台永远运行的事件循环 asyncio.set_event_loop(loop) loop.run_forever() def do_sleep(x, queue, msg=""): time.sleep(x) queue.put(msg) queue = Queue() new_loop = asyncio.new_event_loop() # 定义一个线程,并传入一个事件循环对象 t = Thread(target=start_loop, args=(new_loop,)) t.start() print(time.ctime()) # 动态添加两个协程 # 这种方法,在主线程是同步的 new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一个") new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二个") while True: msg = queue.get() print("{} 协程运行完..".format(msg)) print(time.ctime()) 由于是同步的,所以总共耗时6+3=9秒. 输出结果 .. code:: python Thu May 31 22:11:16 2018 第一个 协程运行完.. Thu May 31 22:11:22 2018 第二个 协程运行完.. Thu May 31 22:11:25 2018 - 主线程是异步的,这是重点,一定要掌握。。 .. code:: python import time import asyncio from queue import Queue from threading import Thread def start_loop(loop): # 一个在后台永远运行的事件循环 asyncio.set_event_loop(loop) loop.run_forever() async def do_sleep(x, queue, msg=""): await asyncio.sleep(x) queue.put(msg) queue = Queue() new_loop = asyncio.new_event_loop() # 定义一个线程,并传入一个事件循环对象 t = Thread(target=start_loop, args=(new_loop,)) t.start() print(time.ctime()) # 动态添加两个协程 # 这种方法,在主线程是异步的 asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop) asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop) while True: msg = queue.get() print("{} 协程运行完..".format(msg)) print(time.ctime()) 输出结果 由于是异步的,所以总共耗时max(6, 3)=\ ``6``\ 秒 .. code:: python Thu May 31 22:23:35 2018 第二个 协程运行完.. Thu May 31 22:23:38 2018 第一个 协程运行完.. Thu May 31 22:23:41 2018 2. 利用redis实现动态添加任务 ---------------------------- 对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。 为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。 先安装Redis 到 https://github.com/MicrosoftArchive/redis/releases 下载 |image1| 解压到你的路径。 |image2| 然后,在当前路径运行cmd,运行redis的服务端。 |image3| 服务开启后,我们就可以运行我们的客户端了。 并依次输入key=queue,value=5,3,1的消息。 |image4| 一切准备就绪之后,我们就可以运行我们的代码了。 .. code:: python import time import redis import asyncio from queue import Queue from threading import Thread def start_loop(loop): # 一个在后台永远运行的事件循环 asyncio.set_event_loop(loop) loop.run_forever() async def do_sleep(x, queue): await asyncio.sleep(x) queue.put("ok") def get_redis(): connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0) return redis.Redis(connection_pool=connection_pool) def consumer(): while True: task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop) if __name__ == '__main__': print(time.ctime()) new_loop = asyncio.new_event_loop() # 定义一个线程,运行一个事件循环对象,用于实时接收新任务 loop_thread = Thread(target=start_loop, args=(new_loop,)) loop_thread.setDaemon(True) loop_thread.start() # 创建redis连接 rcon = get_redis() queue = Queue() # 子线程:用于消费队列消息,并实时往事件对象容器中添加新任务 consumer_thread = Thread(target=consumer) consumer_thread.setDaemon(True) consumer_thread.start() while True: msg = queue.get() print("协程运行完..") print("当前时间:", time.ctime()) 稍微讲下代码 ``loop_thread``\ :单独的线程,运行着一个事件对象容器,用于实时接收新任务。 ``consumer_thread``\ :单独的线程,实时接收来自Redis的消息队列,并实时往事件对象容器中添加新任务。 输出结果 .. code:: python Thu May 31 23:42:48 2018 协程运行完.. 当前时间: Thu May 31 23:42:49 2018 协程运行完.. 当前时间: Thu May 31 23:42:51 2018 协程运行完.. 当前时间: Thu May 31 23:42:53 2018 我们在Redis,分别发起了5s,3s,1s的任务。 从结果来看,这三个任务,确实是并发执行的,1s的任务最先结束,三个任务完成总耗时5s 运行后,程序是一直运行在后台的,我们每一次在Redis中输入新值,都会触发新任务的执行。。 .. |image1| image:: https://i.loli.net/2018/06/03/5b13ba8525bcf.png .. |image2| image:: https://i.loli.net/2018/06/03/5b13ba9f66baa.png .. |image3| image:: https://i.loli.net/2018/06/03/5b13bab682a32.png .. |image4| image:: https://i.loli.net/2018/06/03/5b13bad79f5ce.png