Caleb Hattingh <[email protected]> added the comment:
The traditional way this done is with a finite number of workers pulling work
off a queue. This is straightforward to set up with builtins:
from uuid import uuid4
import asyncio, random
async def worker(q: asyncio.Queue):
while job := await q.get():
print(f"working on job {job}")
await asyncio.sleep(random.random() * 5)
print(f"Completed job {job}")
q.task_done()
async def scheduler(q, max_concurrency=5):
workers = []
for i in range(max_concurrency):
w = asyncio.create_task(worker(q))
workers.append(w)
try:
await asyncio.gather(*workers)
except asyncio.CancelledError:
pass
async def main():
jobs = [uuid4().hex for i in range(1_000)]
q = asyncio.Queue()
for job in jobs:
await q.put(job)
t = asyncio.create_task(scheduler(q))
await q.join()
t.cancel()
await t
if __name__ == "__main__":
asyncio.run(main())
A neater API would be something like our Executor API in concurrent.futures,
but we don't yet have one of those for asyncio. I started playing with some
ideas for this a while ago here: https://github.com/cjrh/coroexecutor
Alas, I did not yet add a "max_workers" parameter so that isn't available in my
lib yet. I discuss options for implementing that in an issue:
https://github.com/cjrh/coroexecutor/issues/2
I believe that the core devs are working on a feature that might also help for
this, called "task groups", but I haven't been following closely so I don't
know where that's at currently.
----------
nosy: +cjrh
_______________________________________
Python tracker <[email protected]>
<https://bugs.python.org/issue41505>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com