Andrey Paramonov <para...@acdlabs.ru> added the comment:

Hello!

Below is updated implementation containing more consistent error handling.
The main rough edges encountered:

1. asyncio.Queue alone proved insufficient for precise control of limit, as 
asyncio.create_task() schedules created Task() immediately and it may start 
executing before being added to queue (thus leading to limit+1 tasks running). 
Additional semaphore is used to tackle that.

2. When exception, other running tasks have to be cancel()ed and then await'ed 
to ensure all tasks are successfully finished by the time igather exits. Just 
cancel()ing proved not sufficient.

3. When exception, unscheduled coroutines have to be wrapped with 
asyncio.create_task(coro).cancel() to avoid RuntimeWarning "coroutine was never 
awaited". But maybe there is a more elegant way to suppress this warning for a 
coroutine?

In my client code I didn't so far encounter "an implicit requirement that back 
pressure from the consumer should be handled", but it should be possible to 
implement separately and quite straightforwardly, with the help of 
asyncio.Queue.

    async def igather(coros, limit=None):
        coros = iter(coros)

        buf = asyncio.Queue()
        sem = asyncio.Semaphore(limit or math.inf)

        async def submit(coros, buf):
            while True:
                await sem.acquire()
                try:
                    # TODO: additionally support async iterators
                    coro = next(coros)
                except StopIteration:
                    break
                task = asyncio.create_task(coro)
                buf.put_nowait(task)
            await buf.put(None)

        async def consume(buf):
            while True:
                task = await buf.get()
                if task:
                    v = await asyncio.wait_for(task, None)
                    sem.release()
                    yield v
                else:
                    break

        submit_task = asyncio.create_task(submit(coros, buf))
        try:
            async for result in consume(buf):
                yield result
        except:
            submit_task.cancel()
            # cancel scheduled
            while not buf.empty():
                task = buf.get_nowait()
                if task:
                    task.cancel()
                    try:
                        await task
                    except:
                        pass
            # cancel pending
            for coro in coros:
                asyncio.create_task(coro).cancel()
            raise

Shall I go ahead and prepare a PR with docs and tests?

----------

_______________________________________
Python tracker <rep...@bugs.python.org>
<https://bugs.python.org/issue30782>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to