New submission from Kevin Amado <kamadoru...@gmail.com>:

Sometimes when dealing with high concurrency systems developers face the 
problem of executing concurrently a large number of tasks while taking care of 
a finite pool of resources

Just to mention some examples:
- reading asynchronously a lot of files without exceeding the maximum number of 
open files by the operative system
- making millions of requests to a website, doing it in sufficiently small 
batches as not to be banned by the site's firewall or hitting API limits
- making a lot of DNS lookups without exceeding the maximum number of open 
sockets allowed by the operative system
- and many more

What these examples have in common is that there is a hard-limit in the maximum 
concurrency possible to solve the problem.

A naive approach is to split the long list of tasks in small batches and use 
asyncio.gather on each batch. This, however, has some downsides if one of the 
tasks takes more time than the others because at some point in time only this 
task would be running and the execution of the following batch gets delayed, 
impacting performance and overall throughput and execution time.

Another approach is to use asyncio.wait on a subset of tasks, gathering the 
done tasks and appending more tasks from the remaining subset until all tasks 
get executed. This alternative is good but still far from optimal as many 
boilerplate code is needed.

The ideal approach is to operate in the possibly infinite list of tasks with an 
always constant number of them being resolved concurrently. If one of the tasks 
being concurrently executed finishes then immediately another one is fetched 
from the input stream and added to the list of concurrently executing ones. By 
doing it in this way we optimize the resources needed while minimizing the 
total execution time and never exceeding the finite pool of resources (sockets, 
open files, http API limit), etc.

What I'm attaching is a proof of concept of a new function to add to the 
asyncio.tasks module that implements the ideal approach.

The proposed signature for such function is:

  async def materialize(aws, *, max_concurrency=None)

And functions in this way:

```
async def do(n: int) -> None:
    print('running', n)
    await asyncio.sleep(1)
    print('returning', n)
    return n

async def main():
    result = []
    async for x in materialize(map(do, range(5)), max_concurrency=2):
        print('got', x)
        result.append(x)

    print(result)
```

Whose output is:

running 0
running 1
returning 0
returning 1
got 0
got 1
running 2
running 3
returning 2
returning 3
got 2
got 3
running 4
returning 4
got 4
[0, 1, 2, 3, 4]

As you can see, tasks are resolved concurrently without exceeding the max 
concurrency allowed, yet always executing concurrently as many tasks as the 
limit specifies. Yielding results as soon as available, keeping a small memory 
footprint (proportional to the max concurrency allowed) and returning results 
in the same order of the input stream (opposite to asyncio.as_completed)

Since it's an asynchronous generator it can deal with infinite input streams, 
which is nice!

I'm willing to work further on a PR

----------
components: asyncio
files: materialize.py
messages: 375028
nosy: asvetlov, kamadorueda, yselivanov
priority: normal
severity: normal
status: open
title: asyncio.gather of large streams with limited resources
type: enhancement
versions: Python 3.10
Added file: https://bugs.python.org/file49377/materialize.py

_______________________________________
Python tracker <rep...@bugs.python.org>
<https://bugs.python.org/issue41505>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to