python-异步执行库asyncio
写工具时遇到需要请求几十次数据, 同步顺序执行, 速度有点慢, 改成异步并行执行就爽很多了, 同理, 其他设计到 io 的会阻塞的都可以使用异步并行执行的方式去解决. 一样的, 文件 io 也可以异步处理.
使用到的是 asyncio (内置) + aiohttp/aiofiles (需要 pip 安装)
前篇
- 官方
- 使用asyncio+aiohttp爬取数据并拼装返回的数据 - https://blog.csdn.net/cui_yonghua/article/details/106840662
- Python黑魔法 — 异步IO( asyncio) 协程 - https://www.jianshu.com/p/b5e347b3a17c
- 在Python中使用Asyncio系统(3-5)异步的语境管理器 - https://www.toutiao.com/i7029609315057009163
代码
工具类 async_util.py (简单的封装一下)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244# -*- coding: utf-8 -*-
import aiofiles
import aiohttp
import asyncio
import json
import sys
import traceback
import threading
from typing import List
from tool import utils
class CReqInfo:
def __init__(self):
self.url = None
self.method = "POST"
self.data = None
self.extA = None # 透传数据
class CRspInfo:
def __init__(self):
self.code: int = 0
self.text = None
self.extA = None # 透传数据
class CFileInfo:
def __init__(self, path, encoding="utf-8"): # 默认读出字符串
self.path = path
self.encoding = encoding
self.content = None
self.error = None
self.extA = None # 透传数据
class CCmdInfo:
def __init__(self, cmd):
self.code = 0
self.msg = None
self.cmd = cmd
self.extA = None # 透传数据
class CThreadInfo:
def __init__(self, target, args=()):
self.target = target
self.args = args
self.result = None
class CInnerThread(threading.Thread):
# def __init__(self, autoid, target, args=()):
def __init__(self, autoid, ti: CThreadInfo):
super(CInnerThread, self).__init__()
self.autoid = autoid
self.target = ti.target
self.args = ti.args
self.ti: CThreadInfo = ti
def run(self):
try:
self.ti.result = self.target(*self.args)
except Exception as e:
self.ti.result = e
traceback.print_stack()
def get_result(self):
return self.autoid, self.ti
class CAsyncHttp:
"""
异步 http 工具
"""
async def request(self, reqInfo: CReqInfo):
if isinstance(reqInfo.data, dict):
reqInfo.data = json.dumps(reqInfo.data)
rspInfo = CRspInfo()
try:
async with aiohttp.request(method=reqInfo.method, url=reqInfo.url, data=reqInfo.data) as response:
rspInfo.code = int(response.status)
rspInfo.extA = reqInfo.extA
rspInfo.text = await response.text()
except Exception as e:
rspInfo.code = -999
rspInfo.text = e
finally:
return rspInfo
def doReq(self, *reqArr) -> List[CRspInfo]:
return CAsyncTask().doTask(*[self.request(reqInfo) for reqInfo in reqArr])
class CAsyncFileRead:
"""
异步 文件 读
"""
async def read(self, fi: CFileInfo):
try:
async with aiofiles.open(fi.path, mode="rb") as fd:
content = await fd.read()
fi.content = fi.encoding is None and content or str(content, encoding=fi.encoding, errors="ignore")
except Exception as e:
fi.error = e
finally:
return fi
def doRead(self, *fileArr) -> List[CFileInfo]:
return CAsyncTask().doTask(*[self.read(fi) for fi in fileArr])
class CAsyncFileWrite:
"""
异步 文件 写
"""
async def write(self, fi: CFileInfo):
utils.createDirForFile(fi.path)
try:
async with aiofiles.open(fi.path, mode="wb") as fd:
bts = fi.encoding is None and fi.content or fi.content.encode(
encoding=fi.encoding)
await fd.write(bts)
except Exception as e:
fi.error = e
finally:
return fi
def doWrite(self, *fileArr) -> List[CFileInfo]:
return CAsyncTask().doTask(*[self.write(fi) for fi in fileArr])
class CAsyncCmd:
"""
异步 系统命令
"""
async def run(self, ci: CCmdInfo):
proc = await asyncio.create_subprocess_shell(
ci.cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
bts = stdout or stderr
ci.code = proc.returncode
ci.msg = bts is not None and str(bts, encoding="utf-8", errors="ignore")
return ci
def doCmd(self, *cmdArr) -> List[CCmdInfo]:
return CAsyncTask().doTask(*[self.run(ci) for ci in cmdArr])
class CAsyncTask:
"""
异步 任务
"""
def __init__(self):
self.isStopProgress = False
async def progress(self):
symbol = ["/", "ᅳ", "\\", "|"]
total = len(symbol)
cnt = 0
while not self.isStopProgress:
sys.stdout.write(f"------ processing {symbol[cnt % total]}\r")
sys.stdout.flush()
await asyncio.sleep(0.1)
cnt += 1
print("------ processing 100%")
async def start(self, *taskArr):
first = asyncio.gather(*taskArr)
second = asyncio.create_task(self.progress())
retVal = await first
self.isStopProgress = True
await second
return retVal
def doTask(self, *taskArr):
loop = asyncio.get_event_loop()
res = loop.run_until_complete(self.start(*taskArr))
# loop.close() # https 会报错: RuntimeError: Event loop is closed
return res
class CAsyncThread:
"""
多线程 封装
"""
def doRun(self, *threadArr) -> List[CThreadInfo]:
thdInsArr = []
autoid = 1
for ti in threadArr:
thd = CInnerThread(autoid=autoid, ti=ti)
autoid += 1
thdInsArr.append(thd)
thd.start()
retDct = {}
for thd in thdInsArr:
thd.join()
aid, ti = thd.get_result()
retDct[aid] = ti
sorted(retDct.items(), key=lambda x: x[0], reverse=False)
return list(retDct.values())
# ------------- 对外接口 -------------
def doTask(*taskArr):
return CAsyncTask().doTask(*taskArr)
def doReq(*reqArr):
return CAsyncHttp().doReq(*reqArr)
def doRead(*fileArr):
return CAsyncFileRead().doRead(*fileArr)
def doWrite(*fileArr):
return CAsyncFileWrite().doWrite(*fileArr)
def doCmd(*cmdArr):
return CAsyncCmd().doCmd(*cmdArr)
def doRun(*threadArr):
return CAsyncThread().doRun(*threadArr)测试用例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys
import os
import asyncio, aiohttp, aiofiles
import json
from datetime import datetime, timedelta
from time import ctime, sleep
import time
import unittest
from tool import utils, async_util
SelfPath: str = os.path.abspath(os.path.dirname(__file__))
# 相关参考:
# https://blog.csdn.net/cui_yonghua/article/details/106840662
# https://www.jianshu.com/p/b5e347b3a17c
class Test_Async(unittest.TestCase):
def setUp(self):
print("\n\n------------------ test result ------------------")
def test_gather(self):
async def count(num):
print(f"One - {num}")
await asyncio.sleep(1)
print(f"Two - {num}")
async def main():
await asyncio.gather(count(1), count(2), count(3)) # gather 并发执行, 返回的结果是顺序的
asyncio.run(main())
print("--- finished")
def test_createTask(self):
async def count(num):
print("One")
await asyncio.sleep(num)
print("Two")
async def main():
first = asyncio.create_task(count(2)) # 创建的时候就开始执行
second = asyncio.create_task(count(1))
await first
print(f"finished first")
await second
print(f"finished second")
asyncio.run(main())
print("--- finished")
def test_progress(self):
from tool.async_util import CAsyncTask, CRspInfo
# 要执行的任务
async def reqFn(num):
url = "http://149.129.147.44:8305/hotupdate"
reqInfo = {
"Plat": 8,
"Os": 2,
"Appid": 3,
"Uid": '123123',
"Version": '0.0.0.1',
"Deviceid": 'wolegequ',
}
rspInfo = CRspInfo()
try:
async with aiohttp.request(method="POST", url=url, data=json.dumps(reqInfo)) as rsp:
print(f"--- idx: {num} code: {rsp.status}")
rspInfo.code = num
rspInfo.text = await rsp.text()
except:
rspInfo.code = -999
finally:
return rspInfo
async def reqFn01():
print("--- start reqFn01")
await asyncio.sleep(20)
return "hello01"
async def reqFn02():
print("--- start reqFn02")
await asyncio.sleep(10)
return "hello02"
async def reqFn03():
print("--- start reqFn03")
await asyncio.sleep(30)
return "hello03"
taskArr = [reqFn(idx) for idx in range(30)]
res = CAsyncTask().doTask(reqFn01(), reqFn02(), reqFn03(), *taskArr)
print(f"--- finished, res: {utils.beautyJson(res)}")
# 异步 io http
def test_concurrencyReq(self):
url = "http://149.129.147.44:8305/hotupdate" # 测试服
# url = "https://www.baidu.com" # 测试服
reqInfo = {
"Plat": 8,
"Os": 2,
"Appid": 3,
"Uid": '123123',
"Version": '0.0.0.1',
"Deviceid": 'wolegequ',
}
# code, rspDct = utils.httpPost(url, utils.objToJson(reqInfo))
# print(f"--- code: {code}, rsp: {utils.beautyJson(rspDct)}")
# return
async def reqFn(idx):
try:
# async with aiohttp.request(method="GET", url=url) as rsp:
async with aiohttp.request(method="POST", url=url, data=json.dumps(reqInfo)) as rsp:
print(f"--- idx: {idx} code: {rsp.status}")
# response.request_info # 请求信息
res = await rsp.text()
# print(f"--- res: {res}")
return res
except:
return "--- error"
# create task 方式
async def main01():
taskArr = []
for idx in range(5):
task = asyncio.create_task(reqFn(idx)) # 创建的时候就开始执行
taskArr.append(task)
resArr = []
for task in taskArr: # 等待所有请求完成
res = await task
resArr.append(res)
return resArr
# gather 方式
async def main02():
taskArr = []
for idx in range(5):
task = reqFn(idx)
taskArr.append(task)
return await asyncio.gather(*taskArr)
# 这个正常执行
loop = asyncio.get_event_loop()
resArr = loop.run_until_complete(main02()) # 完成事件循环,直到最后一个任务结束
# # 这样执行可能会有报错: RuntimeError: Event loop is closed
# resArr = asyncio.run(main02())
print("--- finished")
print(f"--- resArr: {utils.beautyJson(resArr)}")
def test_compare_http(self):
url = "http://149.129.147.44:8305/hotupdate"
# url = "https://www.baidu.com"
reqCnt = 1
dct = {
"Plat": 8,
"Os": 2,
"Appid": 3,
"Uid": '123123',
"Version": '0.0.0.1',
"Deviceid": 'wolegequ',
}
def syncFn():
print("--- syncFn start")
for idx in range(reqCnt):
code, rspDct = utils.httpPost(url, dct)
print("--- syncFn end")
def asyncFn():
print("--- asyncFn start")
reqArr = []
for idx in range(reqCnt):
ri = async_util.CReqInfo()
ri.url = url
ri.data = dct # 可以是 dict, 也可以是 json string
ri.method = "POST"
ri.extA = f"extra data {idx}"
reqArr.append(ri)
resArr = async_util.doReq(*reqArr)
print("--- type: {}, len: {}".format(type(resArr), len(resArr)))
# print(f"--- finished, resArr: {utils.beautyJson(resArr)}")
print("--- asyncFn end")
sync_cc = syncFn()
print("sync: {}".format(sync_cc))
print()
async_cc = asyncFn()
print("async: {}".format(async_cc))
# 异步 io 文件
def test_asyncFile(self):
async def dealFile(filePath):
print("--- dealFile:", filePath)
async with aiofiles.open(filePath, mode="r") as fd: # 读
txt = await fd.read()
print("--- read:", txt)
async with aiofiles.open(filePath, mode="w") as fd: # 写
await fd.write("wolegequ")
return "done!!"
path = utils.getDesktop("test_io2/aaa.txt")
res = async_util.doTask(dealFile(path))
print("--- res:", res)
# 异步 io 文件, 按行读取
def test_asyncLine(self):
async def dealFile(filePath):
print("--- dealFile:", filePath)
async with aiofiles.open(filePath, mode="rb") as fd: # 读
async for line in fd:
# print("--- line:", line.decode(encoding="utf-8", errors="ignore"))
print("--- line:", str(line, encoding="utf-8", errors="ignore"))
path = utils.getDesktop("a_temp.lua")
res = async_util.doTask(dealFile(path))
print("--- res:", res)
# 对比读文件 同步 异步 耗时
def test_compare_readFile(self):
dstDir = utils.getDesktop("test_io")
fileArr = utils.getFiles(dstDir, ["*.*"])
print("--- fileArr len: {}".format(len(fileArr)))
def syncFn():
print("--- syncFn start")
for file in fileArr:
# time.sleep(0.5)
utils.readFileBytes(file)
print("--- syncFn end")
def asyncFn():
print("--- asyncFn start")
fiArr = [async_util.CFileInfo(file) for file in fileArr]
res = async_util.doRead(*fiArr)
# print("--- res:", utils.beautyJson(res))
# 换个目录写进去
# for fi in fiArr:
# fi.path = fi.path.replace("test_io2", "test_io3")
# async_util.doWrite(*fiArr)
print("--- asyncFn end")
sync_cc = syncFn()
print("sync: {}".format(sync_cc))
print()
async_cc = asyncFn()
print("async: {}".format(async_cc))
# 异步并行执行 系统命令
def test_subprocess(self):
# 官方文档: https://docs.python.org/3/library/asyncio-subprocess.html
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode(errors="ignore")}')
if stderr:
print(f'[stderr]\n{stderr.decode(errors="ignore")}')
cmd = "git status"
asyncio.run(run(cmd))
def test_compare_subprocess(self):
cnt = 5
# cmd = "git status"
cmd = "call {}".format(utils.getDesktop("aaa.exe")) # aaa.exe 会执行 9s
def asyncFn():
cmdArr = []
for i in range(cnt):
ci = async_util.CCmdInfo(cmd)
ci.extA = i
cmdArr.append(ci)
res = async_util.doCmd(*cmdArr)
# print("--- res:", utils.beautyJson(res))
def syncFn():
async def run(command):
return utils.cmdToString(command)
res = async_util.doTask(*[run(cmd) for i in range(cnt)])
# print("--- res:", utils.beautyJson(res))
dt1 = syncFn()
print("--- syncFn cost time:", dt1) # --- syncFn cost time: 00:00:45, 顺序执行
dt2 = asyncFn()
print("--- asyncFn cost time:", dt2) # --- asyncFn cost time: 00:00:09, 并行执行
# 真正的 多线程并行
def test_multi_thread(self):
def fn001(name):
# print("--- hello 111, name: {}".format(name))
# time.sleep(5)
# print("--- hello 222, name: {}".format(name))
# 模拟异常
# assert False, "--- wolegequ"
# arr = []
# b = arr[1]
utils.execute("call {}".format(utils.getDesktop("aaa.exe")))
return "world-{}".format(name)
# res 顺序返回值
res = async_util.doRun(*[async_util.CThreadInfo(target=fn001, args=(i,)) for i in range(3)])
# print("--- end, res: {}".format(utils.beautyJson([str(ti.result) for ti in res])))
for ti in res:
print("--- result is error:", utils.isError(ti.result))
# print("--- exmsg:", utils.exmsg(ti.result))
if __name__ == "__main__":
ins = Test_Async()
ins.test_multi_thread()
踩坑
证书错误
错误: ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate has expired
解决办法: connector 指定不校验即可: async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
参考: https://github.com/aio-libs/aiohttp/issues/955
mac 平台 证书错误
错误: [SSLCertVerificationError: (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1123)')]
解决办法: 双击 Install Certificates.command 文件安装证书
参考: https://blog.csdn.net/u011072037/article/details/102861658
非主线程运行报错
错误: There is no current event loop in thread
在非主线程中执行 asyncio.get_event_loop()
会报错