
本文深入探讨了在Python asyncio中调度嵌套异步函数时遇到的并发挑战。通过分析传统`await`操作的阻塞特性,揭示了其在复杂流处理场景中的局限性。文章提出并详细阐述了基于`asyncio.Queue`和`asyncio.Event`的生产者-消费者模式,作为实现任务间解耦和真正并发执行的有效策略,从而显著提升异步应用的响应性和效率。
在异步编程中,我们经常需要处理数据流,其中一个任务负责生成数据,另一个任务负责处理数据。Python的asyncio库提供了强大的工具来构建并发应用程序,但在调度嵌套异步函数时,如果不正确地理解await关键字的行为,可能会导致程序并非按预期并发执行,而是串行阻塞。
考虑一个常见的场景:我们有一个字符流生成器,它逐步产生字符;一个句子生成器,它从字符流中收集字符并形成完整的句子;以及一个句子处理器,它对每个句子执行耗时操作。
以下是初始实现的代码结构:
import asyncio
async def stream():
char_string = "Hi. Hello. Hello."
for char in char_string:
await asyncio.sleep(0.1) # 模拟耗时操作
print("got char:", char)
yield char
async def sentences_generator():
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
yield sentence
sentence = ""
async def process_sentence(sentence: str):
print("waiting for processing sentence: ", sentence)
await asyncio.sleep(len(sentence)*0.1) # 模拟耗时操作
print("sentence processed!")
async def main():
i = 0
async for sentence in sentences_generator():
print("processing sentence: ", i)
await process_sentence(sentence) # 这里的await是关键
i += 1
asyncio.run(main())运行上述代码,其输出大致如下:
got char: H got char: i got char: . got sentence: Hi. processing sentence: 0 waiting for processing sentence: Hi. sentence processed! got char: got char: H got char: e got char: l got char: l got char: o got char: . got sentence: Hello. processing sentence: 1 waiting for processing sentence: Hello. sentence processed!
从输出可以看出,当process_sentence函数被await时,main协程会暂停,直到process_sentence完全执行完毕。这意味着在process_sentence处理第一个句子期间,stream和sentences_generator无法继续生成新的字符和句子。这并非我们期望的并发行为,我们希望在处理一个句子的同时,上游的字符流能够继续生成,从而提高整体吞吐量。
造成这种现象的根本原因在于await关键字的语义。当一个协程await另一个协程时,它会暂停自身的执行,并将控制权交给被await的协程。只有当被await的协程完成或自身也await了其他操作时,控制权才可能回到调用者。因此,在上述例子中,main函数中的await process_sentence(sentence)会完全阻塞main函数,直到当前句子处理完毕,才能继续从sentences_generator中获取下一个句子。
为了实现真正的并发,即在process_sentence处理句子的同时,sentences_generator和stream能够继续生成数据,我们可以采用经典的生产者-消费者模式。
在这种模式中:
asyncio提供了asyncio.Queue来实现这种异步安全的共享队列。此外,为了优雅地处理生产者完成后的消费者关闭问题,我们还可以引入asyncio.Event来发出生产者完成的信号。
Sitekick
一个AI登陆页面自动构建器
121
查看详情
以下是使用生产者-消费者模式重构后的代码:
import asyncio
async def stream():
char_string = "Hi. Hello. Thank you." # 更改了字符串以展示更长的流
for char in char_string:
await asyncio.sleep(0.1)
print("got char:", char)
yield char
async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
"""
生产者:从字符流生成句子,并放入队列。
当字符流结束时,设置Event标志通知消费者。
"""
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
await q.put(sentence) # 将生成的句子放入队列
sentence = ""
flag.set() # 生产者完成所有句子的生成,设置Event
async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
"""
消费者:从队列中取出句子进行处理。
当队列为空且生产者已完成时,消费者停止。
"""
global i # 用于计数处理的句子
while True:
# 检查是否应该停止:队列为空且生产者已完成
if q.empty() and flag.is_set():
break
try:
# 尝试从队列获取项,如果队列为空,会等待
item = await asyncio.wait_for(q.get(), timeout=1.0) # 增加超时,避免无限等待
except asyncio.TimeoutError:
# 如果超时且生产者已完成,则退出
if flag.is_set():
break
continue # 否则继续等待
print("processing sentence: ", i)
print("waiting for processing sentence: ", item)
await asyncio.sleep(len(item) * 0.1)
print("sentence processed!")
i += 1
async def main():
global i
i = 1 # 初始化句子计数器
event = asyncio.Event() # 用于生产者通知消费者完成
queue = asyncio.Queue[str]() # 共享队列
# 创建生产者和消费者任务
producer_task = sentences_generator(queue, event)
consumer_task = process_sentence(queue, event)
# 并发运行生产者和消费者任务
await asyncio.gather(producer_task, consumer_task)
asyncio.run(main())代码解析:
sentences_generator (生产者):
process_sentence (消费者):
main 函数:
预期输出(部分):
got char: H got char: i got char: . got sentence: Hi. got char: got char: H got char: e got char: l got char: l got char: o got char: . got sentence: Hello. processing sentence: 1 waiting for processing sentence: Hi. got char: got char: T got char: h got char: a got char: n got char: k got char: got char: y got char: o got char: u got char: . got sentence: Thank you. sentence processed! processing sentence: 2 waiting for processing sentence: Hello. sentence processed! processing sentence: 3 waiting for processing sentence: Thank you. sentence processed!
从新的输出中可以看到,当process_sentence正在处理"Hi."时,stream和sentences_generator已经继续生成了"Hello."甚至"Thank you."。这种交错的输出表明生产者和消费者正在并发地工作,显著提高了程序的效率和响应性。
通过采用生产者-消费者模式并结合asyncio.Queue和asyncio.Event,我们可以有效地管理异步任务间的依赖关系,实现更高效、更具响应性的并发数据流处理。这对于构建复杂的异步系统,如网络服务、数据管道等,是至关重要的技术。
以上就是优化Asyncio嵌套函数调度:使用生产者-消费者模式实现并发流处理的详细内容,更多请关注其它相关文章!
# 我们可以
# 石家庄网站搜索优化服务
# seo优化排名官方版
# 浦东seo优化报价
# 新乡搜狗关键词排名技术
# 投资公司网站建设流程
# 上海营销推广企业排名榜
# seo实战进展
# 罗湖手机网站优化效果好
# 宿松网站建设制作外包
# 卖车位营销推广关键词
# 的是
# 几种
# 并将
# python
# 它是
# 会在
# 重构
# 浮点
# 多个
# 为空
# 异步任务
# stream
# ai
# 工具
# 处理器
# go
相关栏目:
【
Google疑问12 】
【
Facebook疑问10 】
【
优化推广96088 】
【
技术知识133117 】
【
IDC资讯59369 】
【
网络运营7196 】
【
IT资讯61894 】
相关推荐:
晓晓优选app支付宝绑定方法
Microsoft Edge网页字体太淡看不清怎么办_Microsoft Edge字体渲染优化技巧
快递查询,一键速查
《随手记》备份数据方法
Python实战:高效处理实时数据流中的最小/最大值
抖音作品被限流怎么办 抖音内容优化与流量恢复方法
《飞猪旅行》购买汽车票方法
植物大战僵尸95版游戏版下载_植物大战僵尸95版游戏版安装指南
mysql怎么查询数据_mysql基础查询语句使用教程
《王者荣耀世界》英雄获取攻略
Excel如何快速找到并断开外部数据源链接_Excel外部数据源断开方法
XPath动态元素定位:如何精准选择文本内容变化的元素
优化长HTML属性值:SonarQube警告与实用策略
虫虫助手如何更新游戏
Teambition网盘如何共享文件
Lar*el如何创建自定义的辅助函数(Helpers)_Lar*el全局函数定义与加载方法
邮政快递寄件查询入口 邮政快递收件查询入口
抖音如何解除|直播|权限绑定_抖音关闭并解绑|直播|功能的方法
使用Python和GBGB API高效抓取指定日期范围和赛道比赛结果教程
重返未来:1999卡戎全方位攻略
使用Google服务账号实现Google Drive API无缝集成与文件访问
React应用中Commerce.js数据加载与状态管理最佳实践
快递物流路径揭秘
哔哩哔哩黑名单怎么查看
《小黑盒》删除历史浏览方法
TikTok收藏夹无法删除视频如何解决 TikTok收藏管理优化方法
雨课堂官网在线登录 网页版雨课堂登录链接
知乎APP怎么查看自己被邀请的问题_知乎APP邀请回答记录查看与参与方法
解决SQLAlchemy模型跨文件关联的Linter兼容性指南
纯CSS实现自适应宽度与响应式布局的水平按钮组
解决CSS容器溢出问题:使用calc()实现精确布局与边距控制
招商淘客入门指南
如何使用 Optional 类型并满足 Pylint 的类型检查
哔哩哔哩在线观看入口 B站官网免费进入
qq邮箱怎么注册_QQ邮箱注册步骤与注意事项
Win10截图远程协助 Win10远程桌面截屏法【场景应用】
大众点评了却看不到是怎么回事
Python定时发送QQ消息
支付宝网页版在线入口 支付宝官网电脑登录入口
J*aScript文本高亮功能优化:解决多词匹配错误与精确分割策略
Excel宏怎么删除_Excel中删除宏的详细操作流程
如何定制PrimeNG Sidebar的背景颜色
J*aScript实现下拉菜单驱动的动态表格数据展示
漫蛙app官方版手机正版入口-漫蛙漫画manwa在线漫画正版入口
优酷下载视频的清晰度怎么选_优酷缓存清晰度设置与选择指南
Python csv 模块处理非字符串数据:列表写入 CSV 文件的机制解析
魔法祈幻界兑换码礼包大全
电脑的“恢复环境(WinRE)”找不到怎么办_Windows系统恢复环境重建【高级修复】
顺丰官方查单号入口 顺丰快递单号查询官网入口
AffinityDesigner图层蒙版怎么用_AffinityDesigner图层蒙版设计应用
2025-11-29
运城市盐湖区信雨科技有限公司是一家深耕海外推广领域十年的专业服务商,作为谷歌推广与Facebook广告全球合作伙伴,聚焦外贸企业出海痛点,以数字化营销为核心,提供一站式海外营销解决方案。公司凭借十年行业沉淀与平台官方资源加持,打破传统外贸获客壁垒,助力企业高效开拓全球市场,成为中小企业出海的可靠合作伙伴。