[issue41505] asyncio.gather of large streams with limited resources

2020-08-23 Thread Kevin Amado


Kevin Amado  added the comment:

Yeah definitely it must be workers

I've experimented a lot about it and finally found something with an interface
similar to asyncio.as_completed

- You control concurrency with `workers` parameter
- You upper-bound memory usage with `worker_greediness` parameter
- Results are yielded back in the same order of the input
- Results are yielded! so working over an unknown-sized iterable of 
`awaitables` like map(func, thins_to_do) or a generator is no problem

The implementation may no be the cleanest as it uses some Events and N Queues 
but
it's proven in tests (keep reading to the end) that the overhead is negligible

def resolve(
awaitables: Iterable[Awaitable[T]],
*,
workers: int = 1024,
worker_greediness: int = 0,
) -> Iterable[Awaitable[T]]:
"""
if workers < 1:
raise ValueError('workers must be >= 1')
if worker_greediness < 0:
raise ValueError('worker_greediness must be >= 0')

if hasattr(awaitables, '__len__'):
# A small optimization can be done if we know the length
workers = min(workers, len(awaitables))

loop = asyncio.get_event_loop()
store: Dict[int, asyncio.Queue] = {}
stream, stream_copy = tee(enumerate(awaitables))
stream_finished = asyncio.Event()
workers_up = asyncio.Event()
workers_tasks: Dict[int, asyncio.Task] = {}

async def worker() -> None:
done: asyncio.Queue = asyncio.Queue(worker_greediness)
for index, awaitable in stream:
store[index] = done
future = loop.create_future()
future.set_result(await schedule(awaitable, loop=loop))
await done.put(future)
workers_up.set()
workers_up.set()
stream_finished.set()

async def start_workers() -> None:
for index in range(workers):
if stream_finished.is_set():
break
workers_tasks[index] = asyncio.create_task(worker())
await force_loop_cycle()
await workers_up.wait()

async def get_one(index: int) -> Awaitable[T]:
if not workers_tasks:
await start_workers()

awaitable = await store.pop(index).get()
result: Awaitable[T] = (await awaitable).result()
return result

for index, _ in stream_copy:
yield cast(Awaitable[T], get_one(index))


Some notes on the usage and outputs are part of the docs of this library:

https://kamadorueda.github.io/aioextensions/#aioextensions.resolve

Here are some proofs about the implementation:
- There is bound-concurrency:
  
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L138
- Workers are always busy even if one of them is processing a long-running job:
  
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L131
- Many workers do not add overhead:
  
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L156
- Errors can be caught on retrieval:
  
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L128

--
nosy: +kamado2

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue41505] asyncio.gather of large streams with limited resources

2020-08-23 Thread Caleb Hattingh


Caleb Hattingh  added the comment:

The traditional way this done is with a finite number of workers pulling work 
off a queue. This is straightforward to set up with builtins:


from uuid import uuid4
import asyncio, random


async def worker(q: asyncio.Queue):
while job := await q.get():
print(f"working on job {job}")
await asyncio.sleep(random.random() * 5)
print(f"Completed job {job}")
q.task_done()


async def scheduler(q, max_concurrency=5):
workers = []
for i in range(max_concurrency):
w = asyncio.create_task(worker(q))
workers.append(w)

try:
await asyncio.gather(*workers)
except asyncio.CancelledError:
pass


async def main():
jobs = [uuid4().hex for i in range(1_000)]
q = asyncio.Queue()
for job in jobs:
await q.put(job)

t = asyncio.create_task(scheduler(q))
await q.join()
t.cancel()
await t


if __name__ == "__main__":
asyncio.run(main())


A neater API would be something like our Executor API in concurrent.futures, 
but we don't yet have one of those for asyncio.  I started playing with some 
ideas for this a while ago here: https://github.com/cjrh/coroexecutor

Alas, I did not yet add a "max_workers" parameter so that isn't available in my 
lib yet. I discuss options for implementing that in an issue: 
https://github.com/cjrh/coroexecutor/issues/2

I believe that the core devs are working on a feature that might also help for 
this, called "task groups", but I haven't been following closely so I don't 
know where that's at currently.

--
nosy: +cjrh

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue41505] asyncio.gather of large streams with limited resources

2020-08-07 Thread Kevin Amado


Change by Kevin Amado :


Removed file: https://bugs.python.org/file49377/materialize-implementation.py

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue41505] asyncio.gather of large streams with limited resources

2020-08-07 Thread Kevin Amado


New submission from Kevin Amado :

Sometimes when dealing with high concurrency systems developers face the 
problem of executing concurrently a large number of tasks while taking care of 
a finite pool of resources

Just to mention some examples:
- reading asynchronously a lot of files without exceeding the maximum number of 
open files by the operative system
- making millions of requests to a website, doing it in sufficiently small 
batches as not to be banned by the site's firewall or hitting API limits
- making a lot of DNS lookups without exceeding the maximum number of open 
sockets allowed by the operative system
- and many more

What these examples have in common is that there is a hard-limit in the maximum 
concurrency possible to solve the problem.

A naive approach is to split the long list of tasks in small batches and use 
asyncio.gather on each batch. This, however, has some downsides if one of the 
tasks takes more time than the others because at some point in time only this 
task would be running and the execution of the following batch gets delayed, 
impacting performance and overall throughput and execution time.

Another approach is to use asyncio.wait on a subset of tasks, gathering the 
done tasks and appending more tasks from the remaining subset until all tasks 
get executed. This alternative is good but still far from optimal as many 
boilerplate code is needed.

The ideal approach is to operate in the possibly infinite list of tasks with an 
always constant number of them being resolved concurrently. If one of the tasks 
being concurrently executed finishes then immediately another one is fetched 
from the input stream and added to the list of concurrently executing ones. By 
doing it in this way we optimize the resources needed while minimizing the 
total execution time and never exceeding the finite pool of resources (sockets, 
open files, http API limit), etc.

What I'm attaching is a proof of concept of a new function to add to the 
asyncio.tasks module that implements the ideal approach.

The proposed signature for such function is:

  async def materialize(aws, *, max_concurrency=None)

And functions in this way:

```
async def do(n: int) -> None:
print('running', n)
await asyncio.sleep(1)
print('returning', n)
return n

async def main():
result = []
async for x in materialize(map(do, range(5)), max_concurrency=2):
print('got', x)
result.append(x)

print(result)
```

Whose output is:

running 0
running 1
returning 0
returning 1
got 0
got 1
running 2
running 3
returning 2
returning 3
got 2
got 3
running 4
returning 4
got 4
[0, 1, 2, 3, 4]

As you can see, tasks are resolved concurrently without exceeding the max 
concurrency allowed, yet always executing concurrently as many tasks as the 
limit specifies. Yielding results as soon as available, keeping a small memory 
footprint (proportional to the max concurrency allowed) and returning results 
in the same order of the input stream (opposite to asyncio.as_completed)

Since it's an asynchronous generator it can deal with infinite input streams, 
which is nice!

I'm willing to work further on a PR

--
components: asyncio
files: materialize.py
messages: 375028
nosy: asvetlov, kamadorueda, yselivanov
priority: normal
severity: normal
status: open
title: asyncio.gather of large streams with limited resources
type: enhancement
versions: Python 3.10
Added file: https://bugs.python.org/file49377/materialize.py

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com