优化Asyncio嵌套函数调度:使用生产者-消费者模式实现并发流处理


优化asyncio嵌套函数调度:使用生产者-消费者模式实现并发流处理

本文深入探讨了在Python asyncio中调度嵌套异步函数时遇到的并发挑战。通过分析传统`await`操作的阻塞特性,揭示了其在复杂流处理场景中的局限性。文章提出并详细阐述了基于`asyncio.Queue`和`asyncio.Event`的生产者-消费者模式,作为实现任务间解耦和真正并发执行的有效策略,从而显著提升异步应用的响应性和效率。

在异步编程中,我们经常需要处理数据流,其中一个任务负责生成数据,另一个任务负责处理数据。Python的asyncio库提供了强大的工具来构建并发应用程序,但在调度嵌套异步函数时,如果不正确地理解await关键字的行为,可能会导致程序并非按预期并发执行,而是串行阻塞。

传统await的阻塞特性及其局限性

考虑一个常见的场景:我们有一个字符流生成器,它逐步产生字符;一个句子生成器,它从字符流中收集字符并形成完整的句子;以及一个句子处理器,它对每个句子执行耗时操作。

以下是初始实现的代码结构:

import asyncio

async def stream():
    char_string = "Hi. Hello. Hello."
    for char in char_string:
        await asyncio.sleep(0.1)  # 模拟耗时操作
        print("got char:", char)
        yield char

async def sentences_generator():
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            yield sentence
            sentence = ""

async def process_sentence(sentence: str):
    print("waiting for processing sentence: ", sentence)
    await asyncio.sleep(len(sentence)*0.1) # 模拟耗时操作
    print("sentence processed!")

async def main():
    i = 0
    async for sentence in sentences_generator():
        print("processing sentence: ", i)
        await process_sentence(sentence) # 这里的await是关键
        i += 1

asyncio.run(main())

运行上述代码,其输出大致如下:

got char: H
got char: i
got char: .
got sentence:  Hi.
processing sentence:  0
waiting for processing sentence:  Hi.
sentence processed!
got char:  
got char: H
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
processing sentence:  1
waiting for processing sentence:   Hello.
sentence processed!

从输出可以看出,当process_sentence函数被await时,main协程会暂停,直到process_sentence完全执行完毕。这意味着在process_sentence处理第一个句子期间,stream和sentences_generator无法继续生成新的字符和句子。这并非我们期望的并发行为,我们希望在处理一个句子的同时,上游的字符流能够继续生成,从而提高整体吞吐量。

造成这种现象的根本原因在于await关键字的语义。当一个协程await另一个协程时,它会暂停自身的执行,并将控制权交给被await的协程。只有当被await的协程完成或自身也await了其他操作时,控制权才可能回到调用者。因此,在上述例子中,main函数中的await process_sentence(sentence)会完全阻塞main函数,直到当前句子处理完毕,才能继续从sentences_generator中获取下一个句子。

解决方案:基于asyncio.Queue的生产者-消费者模式

为了实现真正的并发,即在process_sentence处理句子的同时,sentences_generator和stream能够继续生成数据,我们可以采用经典的生产者-消费者模式。

在这种模式中:

  1. 生产者(Producer):负责生成数据(例如,sentences_generator生成句子),并将数据放入一个共享队列中。
  2. 消费者(Consumer):负责从共享队列中取出数据(例如,process_sentence处理句子),并独立执行其任务。

asyncio提供了asyncio.Queue来实现这种异步安全的共享队列。此外,为了优雅地处理生产者完成后的消费者关闭问题,我们还可以引入asyncio.Event来发出生产者完成的信号。

Sitekick Sitekick

一个AI登陆页面自动构建器

Sitekick 121 查看详情 Sitekick

以下是使用生产者-消费者模式重构后的代码:

import asyncio

async def stream():
    char_string = "Hi. Hello. Thank you." # 更改了字符串以展示更长的流
    for char in char_string:
        await asyncio.sleep(0.1)
        print("got char:", char)
        yield char

async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    生产者:从字符流生成句子,并放入队列。
    当字符流结束时,设置Event标志通知消费者。
    """
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            await q.put(sentence) # 将生成的句子放入队列
            sentence = ""
    flag.set() # 生产者完成所有句子的生成,设置Event

async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    消费者:从队列中取出句子进行处理。
    当队列为空且生产者已完成时,消费者停止。
    """
    global i # 用于计数处理的句子
    while True:
        # 检查是否应该停止:队列为空且生产者已完成
        if q.empty() and flag.is_set():
            break
        try:
            # 尝试从队列获取项,如果队列为空,会等待
            item = await asyncio.wait_for(q.get(), timeout=1.0) # 增加超时,避免无限等待
        except asyncio.TimeoutError:
            # 如果超时且生产者已完成,则退出
            if flag.is_set():
                break
            continue # 否则继续等待

        print("processing sentence: ", i)
        print("waiting for processing sentence: ", item)
        await asyncio.sleep(len(item) * 0.1)
        print("sentence processed!")
        i += 1

async def main():
    global i
    i = 1 # 初始化句子计数器
    event = asyncio.Event() # 用于生产者通知消费者完成
    queue = asyncio.Queue[str]() # 共享队列

    # 创建生产者和消费者任务
    producer_task = sentences_generator(queue, event)
    consumer_task = process_sentence(queue, event)

    # 并发运行生产者和消费者任务
    await asyncio.gather(producer_task, consumer_task)

asyncio.run(main())

代码解析:

  1. sentences_generator (生产者):

    • 接收一个asyncio.Queue实例q和一个asyncio.Event实例flag。
    • 它继续从stream()生成字符并构建句子。
    • 一旦一个完整的句子形成,它不再直接yield句子,而是使用await q.put(sentence)将句子异步地放入队列。
    • 当stream()耗尽所有字符,即生产者完成其所有工作时,它调用flag.set()来通知消费者没有更多的句子会生成。
  2. process_sentence (消费者):

    • 同样接收q和flag。
    • 在一个无限循环中运行,直到满足退出条件。
    • 使用await q.get()异步地从队列中取出句子。如果队列为空,q.get()会暂停当前协程,直到有新的项可用。
    • 为了更健壮地处理消费者退出,我们添加了asyncio.wait_for和超时机制。当队列为空且flag.is_set()为真时(表示生产者已完成且队列中不再有新数据),消费者将退出循环。
    • 取出句子后,它执行模拟的耗时处理await asyncio.sleep(...)。
  3. main 函数:

    • 初始化asyncio.Event和asyncio.Queue。
    • 使用asyncio.gather(producer_task, consumer_task)同时启动生产者和消费者两个独立的协程。asyncio.gather会等待所有传入的协程完成。由于生产者会在完成后设置事件,而消费者会在队列清空且事件设置后退出,因此gather最终会完成。

预期输出(部分):

got char: H
got char: i
got char: .
got sentence:  Hi.
got char:  
got char: H
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
processing sentence:  1
waiting for processing sentence:  Hi.
got char:  
got char: T
got char: h
got char: a
got char: n
got char: k
got char:  
got char: y
got char: o
got char: u
got char: .
got sentence:   Thank you.
sentence processed!
processing sentence:  2
waiting for processing sentence:   Hello.
sentence processed!
processing sentence:  3
waiting for processing sentence:   Thank you.
sentence processed!

从新的输出中可以看到,当process_sentence正在处理"Hi."时,stream和sentences_generator已经继续生成了"Hello."甚至"Thank you."。这种交错的输出表明生产者和消费者正在并发地工作,显著提高了程序的效率和响应性。

注意事项与总结

  1. asyncio.Queue的重要性: 它是实现任务间安全通信和解耦的关键。生产者将数据放入队列,消费者从队列取出数据,两者无需直接等待对方完成,只需通过队列进行协调。
  2. asyncio.Event的用途: 在生产者-消费者模式中,asyncio.Event常用于信号通知。生产者完成所有工作后,设置Event,消费者在队列为空时检查此Event,以判断是否可以安全退出,避免消费者在生产者已无数据生成后无限期等待。
  3. 优雅地关闭消费者: 确保消费者能够识别生产者完成的信号并优雅地退出,是构建健壮异步应用的重要一环。除了asyncio.Event,也可以考虑使用特殊的“哨兵值”(Sentinel Value)放入队列来指示生产者结束。
  4. asyncio.gather: 用于同时运行多个协程,并等待它们全部完成。它是协调多个独立或半独立任务的强大工具。
  5. 理解并发与并行: asyncio实现的是并发(concurrency),而非真正的并行(parallelism)。这意味着在单个CPU核心上,任务仍然是交替执行的,但通过await的非阻塞特性,可以在等待I/O操作(如asyncio.sleep)时切换到其他任务,从而提高资源利用率。

通过采用生产者-消费者模式并结合asyncio.Queue和asyncio.Event,我们可以有效地管理异步任务间的依赖关系,实现更高效、更具响应性的并发数据流处理。这对于构建复杂的异步系统,如网络服务、数据管道等,是至关重要的技术。

以上就是优化Asyncio嵌套函数调度:使用生产者-消费者模式实现并发流处理的详细内容,更多请关注其它相关文章!


# 我们可以  # 石家庄网站搜索优化服务  # seo优化排名官方版  # 浦东seo优化报价  # 新乡搜狗关键词排名技术  # 投资公司网站建设流程  # 上海营销推广企业排名榜  # seo实战进展  # 罗湖手机网站优化效果好  # 宿松网站建设制作外包  # 卖车位营销推广关键词  # 的是  # 几种  # 并将  # python  # 它是  # 会在  # 重构  # 浮点  # 多个  # 为空  # 异步任务  # stream  # ai  # 工具  # 处理器  # go 


相关栏目: 【 Google疑问12 】 【 Facebook疑问10 】 【 优化推广96088 】 【 技术知识133117 】 【 IDC资讯59369 】 【 网络运营7196 】 【 IT资讯61894


相关推荐: 晓晓优选app支付宝绑定方法  Microsoft Edge网页字体太淡看不清怎么办_Microsoft Edge字体渲染优化技巧  快递查询,一键速查  《随手记》备份数据方法  Python实战:高效处理实时数据流中的最小/最大值  抖音作品被限流怎么办 抖音内容优化与流量恢复方法  《飞猪旅行》购买汽车票方法  植物大战僵尸95版游戏版下载_植物大战僵尸95版游戏版安装指南  mysql怎么查询数据_mysql基础查询语句使用教程  《王者荣耀世界》英雄获取攻略  Excel如何快速找到并断开外部数据源链接_Excel外部数据源断开方法  XPath动态元素定位:如何精准选择文本内容变化的元素  优化长HTML属性值:SonarQube警告与实用策略  虫虫助手如何更新游戏  Teambition网盘如何共享文件  Lar*el如何创建自定义的辅助函数(Helpers)_Lar*el全局函数定义与加载方法  邮政快递寄件查询入口 邮政快递收件查询入口  抖音如何解除|直播|权限绑定_抖音关闭并解绑|直播|功能的方法  使用Python和GBGB API高效抓取指定日期范围和赛道比赛结果教程  重返未来:1999卡戎全方位攻略  使用Google服务账号实现Google Drive API无缝集成与文件访问  React应用中Commerce.js数据加载与状态管理最佳实践  快递物流路径揭秘  哔哩哔哩黑名单怎么查看  《小黑盒》删除历史浏览方法  TikTok收藏夹无法删除视频如何解决 TikTok收藏管理优化方法  雨课堂官网在线登录 网页版雨课堂登录链接  知乎APP怎么查看自己被邀请的问题_知乎APP邀请回答记录查看与参与方法  解决SQLAlchemy模型跨文件关联的Linter兼容性指南  纯CSS实现自适应宽度与响应式布局的水平按钮组  解决CSS容器溢出问题:使用calc()实现精确布局与边距控制  招商淘客入门指南  如何使用 Optional 类型并满足 Pylint 的类型检查  哔哩哔哩在线观看入口 B站官网免费进入  qq邮箱怎么注册_QQ邮箱注册步骤与注意事项  Win10截图远程协助 Win10远程桌面截屏法【场景应用】  大众点评了却看不到是怎么回事  Python定时发送QQ消息  支付宝网页版在线入口 支付宝官网电脑登录入口  J*aScript文本高亮功能优化:解决多词匹配错误与精确分割策略  Excel宏怎么删除_Excel中删除宏的详细操作流程  如何定制PrimeNG Sidebar的背景颜色  J*aScript实现下拉菜单驱动的动态表格数据展示  漫蛙app官方版手机正版入口-漫蛙漫画manwa在线漫画正版入口  优酷下载视频的清晰度怎么选_优酷缓存清晰度设置与选择指南  Python csv 模块处理非字符串数据:列表写入 CSV 文件的机制解析  魔法祈幻界兑换码礼包大全  电脑的“恢复环境(WinRE)”找不到怎么办_Windows系统恢复环境重建【高级修复】  顺丰官方查单号入口 顺丰快递单号查询官网入口  AffinityDesigner图层蒙版怎么用_AffinityDesigner图层蒙版设计应用 

 2025-11-29

了解您产品搜索量及市场趋势,制定营销计划

同行竞争及网站分析保障您的广告效果

点击免费数据支持

提交您的需求,1小时内享受我们的专业解答。

运城市盐湖区信雨科技有限公司


运城市盐湖区信雨科技有限公司

运城市盐湖区信雨科技有限公司是一家深耕海外推广领域十年的专业服务商,作为谷歌推广与Facebook广告全球合作伙伴,聚焦外贸企业出海痛点,以数字化营销为核心,提供一站式海外营销解决方案。公司凭借十年行业沉淀与平台官方资源加持,打破传统外贸获客壁垒,助力企业高效开拓全球市场,成为中小企业出海的可靠合作伙伴。

 8156699

 13765294890

 8156699@qq.com

Notice

We and selected third parties use cookies or similar technologies for technical purposes and, with your consent, for other purposes as specified in the cookie policy.
You can consent to the use of such technologies by closing this notice, by interacting with any link or button outside of this notice or by continuing to browse otherwise.