Pyper Python 并发编程框架
Pyper 是一个旨在简化 Python 并发编程的框架,它提供了一个直观的 API,支持基于函数式编程模式的并发和并行数据处理,它能解决并发编程复杂、易出错的问题,核心价值在于简化并发和并行数据处理。
主要特性
- 直观的 API,无缝统一线程、多进程和异步工作
- 高效的懒加载执行,使用队列和生成器提升性能
- 完全用 Python 实现,零依赖
- 函数式理念,Python 函数是数据管道的构建块,让你自然地编写干净且可重用的代码
- 安全,隐藏了底层任务执行和资源清理的复杂工作,再也不用担心竞态条件、内存泄漏或线程级别的错误处理
示例代码
import asyncioimport time
from pyper import task
def get_data(limit: int):
for i in range(limit):
yield i
async def step1(data: int):
await asyncio.sleep(1)
print("Finished async wait", data)
return data
def step2(data: int):
time.sleep(1)
print("Finished sync wait", data)
return data
def step3(data: int):
for i in range(10_000_000):
_ = i*i
print("Finished heavy computation", data)
return data
async def main():
# Define a pipeline of tasks using `pyper.task`
pipeline = task(get_data, branch=True) \
| task(step1, workers=20) \
| task(step2, workers=20) \
| task(step3, workers=20, multiprocess=True)
# Call the pipeline
total = 0
async for output in pipeline(limit=20):
total += output
print("Total:", total)
if __name__ == "__main__":
asyncio.run(main())
11
