随着python 3.7版本的正式发布,async/await终于也变成了关键字,从python 3.5引入这个新语法以来,已经过了3年,python异步编程也逐渐变成了重要的概念。
之前我一般使用线/进程池来处理并发,现在是时候学习一下标准库的异步框架了。
1. 传统的同步方法处理IO问题 由于python中GIL锁的存在,传统的计算密集型事务一般采用多进程模型,而IO密集型则多用相对轻量一点多线程模型。例如常见的一种场景,爬虫大批量下载内容的时候,线程池是一种常用的技术。
下面先用普通的同步方法下载一组图片,这里简单起见使用wallhaven上的图片,一方面是因为它的url定义很简单,另一方面这个站点在我这里连接比较慢,容易体现出各种方法的差异性,简单起见,实现下载的函数使用了requests库。(注:2020年,wallhaven已经更改了链接样式,请使用其它站点或方法测试。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import requestssave_folder = os.path.abspath('./imgs/' ) def sync_fetch_img (id ): """根据图片id下载,使用同步方法,不验证ssl以节省时间.""" ul = f'https://wallpapers.wallhaven.cc/wallpapers/full/wallhaven-{id } .jpg' img_data = requests.get(ul, verify=False ) if img_data.status_code != 200 : return img_name = f'sync-wallhaven-{id } .jpg' img_path = os.path.join(save_folder, img_name) with open (img_path, 'wb' ) as f: f.write(img_data.content) print (f'Download complete - {id } .jpg' ) def sync_fetch_all_imgs (): """按顺序同步下载所有图片.""" for current_id in range (30000 , 30050 ): sync_fetch_img(current_id)
在网络环境稳定的情况下,使用这种方法下载了49张图片,多次平均之后花费时间是42.9s。
2. 多线/进程处理IO的并发问题 这里简单起见就直接使用线程池了,当然进程池效率区别也不大,sync_fetch_img函数沿用上面的,主要原理就是在线程池中取出线程并发地发送请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 def concurrent_fetch_all_imgs (modes ): """使用进/线程池下载所有图片.""" if modes == 'p' : from concurrent.futures import ProcessPoolExecutor as img_executor elif modes == 't' : from concurrent.futures import ThreadPoolExecutor as img_executor else : print ('Modes only support "p" or "t".' ) return with img_executor(max_workers=None ) as executor: executor.map (sync_fetch_img, range (30000 , 30050 ))
在5个线程的情况下,下载时间的平均值为22.9s。
3. 基于标准库asyncio的异步处理方法 asyncio实际上是由python中的yield一步一步演化来的,先用asyncio配合aiohttp这个异步网络库试一试,主要就是异步发送请求,无需等待,异步的获取结果这种过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import aiohttpasync def async_fetch_image (loop, id ): """根据图片id下载,使用异步方法.""" url = f'https://wallpapers.wallhaven.cc/wallpapers/full/wallhaven-{id } .jpg' async with aiohttp.ClientSession(loop=loop) as session: async with session.get(url, ssl=False ) as response: if response.status != 200 : return img_name = f'async-wallhaven-{id } .jpg' img_path = os.path.join(save_folder, img_name) with open (img_path, 'wb' ) as fd: while True : chunk = await response.content.read() if not chunk: break fd.write(chunk) print (f'Download complete - {id } .jpg' )
这里实现了一个基本的异步下载函数,关于它的使用方法就是把它放到事件循环里执行,写法相当简洁,如下所示:
1 2 3 4 5 6 7 8 9 10 11 import platformimport asyncioif platform.system() != 'Windows' : import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) dl_loop = asyncio.get_event_loop() dl_loop.run_until_complete( asyncio.gather(*(async_fetch_image(dl_loop, id ) for id in range (30000 , 30050 ))))
在这种异步处理的情况下,下载时间的平均值为22.9s。
4. 关于结果的分析 第一种方案里同步获取,图片一张一张地下载,效率较低;第二种方案里实际上启动了5个线程,理论上速度会接近第一种的5倍,但实际上只是2倍左右,查看当时网络统计,发现是因为测试环境的小水管带宽倍跑慢了,制约了速度的上限;同理,第三种也是受到了网络带宽的约束,因此耗时与第二种相同。
下次需要找个更好的网络环境再测一遍。