AI毛毛的blog

在python3中使用异步方法处理并发

随着python 3.7版本的正式发布,async/await终于也变成了关键字,从python 3.5引入这个新语法以来,已经过了3年,python异步编程也逐渐变成了重要的概念。

之前我一般使用线/进程池来处理并发,现在是时候学习一下标准库的异步框架了。

1. 传统的同步方法处理IO问题

由于python中GIL锁的存在,传统的计算密集型事务一般采用多进程模型,而IO密集型则多用相对轻量一点多线程模型。例如常见的一种场景,爬虫大批量下载内容的时候,线程池是一种常用的技术。

下面先用普通的同步方法下载一组图片,这里简单起见使用wallhaven上的图片,一方面是因为它的url定义很简单,另一方面这个站点在我这里连接比较慢,容易体现出各种方法的差异性,简单起见,实现下载的函数使用了requests库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding: utf-8 -*-
import requests


# 演示用代码,简单起见假设路径都已经存在
save_folder = os.path.abspath('./imgs/')


def sync_fetch_img(id):
"""根据图片id下载,使用同步方法,不验证ssl以节省时间."""
ul = f'https://wallpapers.wallhaven.cc/wallpapers/full/wallhaven-{id}.jpg'
img_data = requests.get(ul, verify=False)
if img_data.status_code != 200:
return

img_name = f'sync-wallhaven-{id}.jpg'
img_path = os.path.join(save_folder, img_name)

with open(img_path, 'wb') as f:
f.write(img_data.content)
print(f'Download complete - {id}.jpg')


def sync_fetch_all_imgs():
"""按顺序同步下载所有图片."""
for current_id in range(30000, 30050):
sync_fetch_img(current_id)

在网络环境稳定的情况下,使用这种方法下载了49张图片,多次平均之后花费时间是42.9s。

2. 多线/进程处理IO的并发问题

这里简单起见就直接使用线程池了,当然进程池效率区别也不大,sync_fetch_img函数沿用上面的,主要原理就是在线程池中取出线程并发地发送请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
def concurrent_fetch_all_imgs(modes):
"""使用进/线程池下载所有图片."""
if modes == 'p':
from concurrent.futures import ProcessPoolExecutor as img_executor
elif modes == 't':
from concurrent.futures import ThreadPoolExecutor as img_executor
else:
print('Modes only support "p" or "t".')
return

with img_executor(max_workers=None) as executor:
# 当max_worker设置为None时,线程池的worker数目使用处理器数目乘以5,进程池为处理器数
executor.map(sync_fetch_img, range(30000, 30050))

在5个线程的情况下,下载时间的平均值为22.9s。

3. 基于标准库asyncio的异步处理方法

asyncio实际上是由python中的yield一步一步演化来的,先用asyncio配合aiohttp这个异步网络库试一试,主要就是异步发送请求,无需等待,异步的获取结果这种过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import aiohttp


async def async_fetch_image(loop, id):
"""根据图片id下载,使用异步方法."""
url = f'https://wallpapers.wallhaven.cc/wallpapers/full/wallhaven-{id}.jpg'

# 同理不验证ssl
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=False) as response:
if response.status != 200:
return

img_name = f'async-wallhaven-{id}.jpg'
img_path = os.path.join(save_folder, img_name)

# 流式下载分块写入相对稳妥一点
with open(img_path, 'wb') as fd:
while True:
chunk = await response.content.read()
if not chunk:
break
fd.write(chunk)
print(f'Download complete - {id}.jpg')

这里实现了一个基本的异步下载函数,关于它的使用方法就是把它放到事件循环里执行,写法相当简洁,如下所示:

1
2
3
4
5
6
7
8
9
10
11
import platform
import asyncio

# 使用uvloop比默认的事件循环效率高一点
if platform.system() != 'Windows':
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

dl_loop = asyncio.get_event_loop()
dl_loop.run_until_complete(
asyncio.gather(*(async_fetch_image(dl_loop, id) for id in range(30000, 30050))))

在这种异步处理的情况下,下载时间的平均值为22.9s。

4. 关于结果的分析

第一种方案里同步获取,图片一张一张地下载,效率较低;第二种方案里实际上启动了5个线程,理论上速度会接近第一种的5倍,但实际上只是2倍左右,查看当时网络统计,发现是因为测试环境的小水管带宽倍跑慢了,制约了速度的上限;同理,第三种也是受到了网络带宽的约束,因此耗时与第二种相同。

下次需要找个更好的网络环境再测一遍。