[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2021-10-13 Thread Jack Wong


Change by Jack Wong :


--
nosy: +iforapsy

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-09-09 Thread Yury Selivanov


Yury Selivanov  added the comment:

> it will help simplify some Mock things.

Yeah, we'll need to chat about that so I can use Mock requirements in the PEP. 
:)

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-09-09 Thread Lisa Roach


Lisa Roach  added the comment:

Oh nice, I remember talking to you about the MultiError before, it will
help simplify some Mock things. Happy to help out if you want more eyes on
it.

On Mon, Sep 9, 2019 at 3:46 PM Yury Selivanov 
wrote:

>
> Yury Selivanov  added the comment:
>
> FWIW I've been using TaskGroups in EdgeDB codebase extensively:
> https://github.com/edgedb/edgedb/blob/master/edb/common/taskgroup.py (you
> can use the code, it's Apache 2)
>
> The only thing that prevented us from merging them in 3.8 is that we need
> to formally define & implement ExceptionGroup (or MultiError) in CPython.
> I'm going to work on an initial PEP for that this week.
>
> --
>
> ___
> Python tracker 
> 
> ___
>

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-09-09 Thread Yury Selivanov


Yury Selivanov  added the comment:

FWIW I've been using TaskGroups in EdgeDB codebase extensively: 
https://github.com/edgedb/edgedb/blob/master/edb/common/taskgroup.py (you can 
use the code, it's Apache 2)

The only thing that prevented us from merging them in 3.8 is that we need to 
formally define & implement ExceptionGroup (or MultiError) in CPython.  I'm 
going to work on an initial PEP for that this week.

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-09-09 Thread Lisa Roach


Lisa Roach  added the comment:

Sounds good, thanks for the explanation Yury. I look forward to the TaskGroups!

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-09-09 Thread Yury Selivanov


Yury Selivanov  added the comment:

We want to add TaskGroups to asyncio (a similar concept to Trio's nurseries).  
TaskGroups use the `async with` statement to clearly define where Tasks are 
created and at which point they are expected to be completed or destroyed.

asyncio.gather(), asyncio.as_completed(), and few others will be considered 
legacy APIs after we implement TaskGroups.  Implementing rate limiting on top 
of TaskGroups is easier and more reliable.

I'd really prefer to keep as_completed() and especially gather() as is, as I 
consider them a bit broken already.

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-09-09 Thread Lisa Roach


Lisa Roach  added the comment:

I would like to see this implemented. I run into memory and speed issues when 
running with 1000+ tasks quite frequently, and have to write my own rate 
limiters around it.

It doesn't look to me that it is adding a large amount of complexity to 
as_completed. This was closed in 2017, perhaps it is worth revisiting?

--
nosy: +lisroach

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-04-16 Thread Andrey Paramonov


Andrey Paramonov  added the comment:

Hello!

Below is updated implementation containing more consistent error handling.
The main rough edges encountered:

1. asyncio.Queue alone proved insufficient for precise control of limit, as 
asyncio.create_task() schedules created Task() immediately and it may start 
executing before being added to queue (thus leading to limit+1 tasks running). 
Additional semaphore is used to tackle that.

2. When exception, other running tasks have to be cancel()ed and then await'ed 
to ensure all tasks are successfully finished by the time igather exits. Just 
cancel()ing proved not sufficient.

3. When exception, unscheduled coroutines have to be wrapped with 
asyncio.create_task(coro).cancel() to avoid RuntimeWarning "coroutine was never 
awaited". But maybe there is a more elegant way to suppress this warning for a 
coroutine?

In my client code I didn't so far encounter "an implicit requirement that back 
pressure from the consumer should be handled", but it should be possible to 
implement separately and quite straightforwardly, with the help of 
asyncio.Queue.

async def igather(coros, limit=None):
coros = iter(coros)

buf = asyncio.Queue()
sem = asyncio.Semaphore(limit or math.inf)

async def submit(coros, buf):
while True:
await sem.acquire()
try:
# TODO: additionally support async iterators
coro = next(coros)
except StopIteration:
break
task = asyncio.create_task(coro)
buf.put_nowait(task)
await buf.put(None)

async def consume(buf):
while True:
task = await buf.get()
if task:
v = await asyncio.wait_for(task, None)
sem.release()
yield v
else:
break

submit_task = asyncio.create_task(submit(coros, buf))
try:
async for result in consume(buf):
yield result
except:
submit_task.cancel()
# cancel scheduled
while not buf.empty():
task = buf.get_nowait()
if task:
task.cancel()
try:
await task
except:
pass
# cancel pending
for coro in coros:
asyncio.create_task(coro).cancel()
raise

Shall I go ahead and prepare a PR with docs and tests?

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-02-25 Thread Andrey Paramonov


Andrey Paramonov  added the comment:

> an implicit requirement that back pressure from the consumer should be 
> handled (i.e. if whoever's iterating through "async for fut in 
> as_completed(...)" is too slow, then the tasks should pause until it catches 
> up)

No, I don't think it is required or desired to be handled.

My initial sketch was imprecise: it's better to asynchronously yield task 
results, not "completed task" futures. This way, no additional buffer needed, 
all error handling can be consolidated inside `igather()`, and that's actually 
more compatible with `asyncio.gather()`.

I.e, instead of

yield next_fut

use

yield await next_fut


--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-02-21 Thread twisteroid ambassador


twisteroid ambassador  added the comment:

I feel like once you lay out all the requirements: taking futures from an 
(async) generator, limiting the number of concurrent tasks, getting completed 
tasks to one consumer "as completed", and an implicit requirement that back 
pressure from the consumer should be handled (i.e. if whoever's iterating 
through "async for fut in as_completed(...)" is too slow, then the tasks should 
pause until it catches up), there are too many moving parts, and this should 
really be implemented using several tasks.

So a straightforward implementation may look like this:

async def better_as_completed(futs, limit):
MAX_DONE_FUTS_HELD = 10  # or some small number

sem = asyncio.Semaphore(limit)
done_q = asyncio.Queue(MAX_DONE_FUTS_HELD)

async def run_futs():
async for fut in futs:
await sem.acquire()
asyncio.create_task(run_one_fut(fut))

async with sem:
await done_q.put(None)

async def run_one_fut(fut):
try:
fut = asyncio.ensure_future(fut)
await asyncio.wait((fut,))
await done_q.put(fut)
finally:
sem.release()

asyncio.create_task(run_futs())

while True:
next_fut = await done_q.get()
if next_fut is None:
return
yield next_fut


Add proper handling for cancellation and exceptions and whatnot, and it may 
become a usable implementation.

And no, I do not feel like this should be added to asyncio.as_completed.

--
nosy: +twisteroid ambassador

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-02-21 Thread Andrey Paramonov


Andrey Paramonov  added the comment:

Might as_completed() be considered a low-level API, but as of Python 3.7 there 
are seemingly no ready alternatives to achieve proposed behavior. All of 
asyncio.gather(), asyncio.wait(), asyncio.as_completed() expect awaitables list 
of limited size; doing something like 
https://www.artificialworlds.net/blog/2017/06/12/making-100-million-requests-with-python-aiohttp
 is not straightforward.

A function that takes iterator/async iterator of tasks and is itself 
generator/async generator is very much wanted, something in the spirit of (but 
more efficient?)

async def igather(tasks, limit=None):
pending = set()
while True:
for task in islice(tasks, limit - len(pending) if limit else None):
pending.add(task)
if pending:
done, pending = await asyncio.wait(pending, 
return_when=asyncio.FIRST_COMPLETED)
for task in done:
yield task
else:
break


It is an open question whether such function should yield results in the task 
submission order. Albeit useful, it's a bit harder to implement and (most 
importantly) has terrible worst-case memory behavior.

See also:
https://bugs.python.org/issue33533
https://github.com/h2non/paco/issues/38

--
nosy: +aparamon

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-02-08 Thread glin


glin  added the comment:

@Andrew Svetlov: I was surprised when you wrote "as_completed() is low-level 
API.", so I wondered what is high level API and I googled it and first that 
came was official doc:

https://docs.python.org/3/library/asyncio-api-index.html

Where it's written that it's high level API indeed.

IMHO without limit, all of these functions are quite impractical (as_completed, 
wait, gather, ...). I have simple to moderately complex scripts and I'm running 
into problems with it (API servers limiting number of requests per minute, 
/tmp/ (4GB ramdisk) out of space, memory issues...

Please reconsider adding limit to these functions as it's suppose to be high 
level API, not low level.

Thanks

--
nosy: +glin

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2018-01-11 Thread Andrew Svetlov

Andrew Svetlov  added the comment:

Third parties are not python core devs responsibility.

I don't aware about existing library with such functionality.

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2018-01-09 Thread Andy Balaam

Andy Balaam  added the comment:

I would argue that this makes as_completed a lot more useful than it is now, so 
would be worth adding (maybe after 3.7).

But, if this does not go into asyncio, is there another library where it would 
belong?  Or should this be a completely new library?

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2017-12-20 Thread Yury Selivanov

Yury Selivanov  added the comment:

I agree, let's keep as_completed() simple for now.  Handling generators+async 
correctly is hard, so we definitely don't have time for this in 3.7.

--
versions: +Python 3.8 -Python 3.7

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2017-12-20 Thread Andrew Svetlov

Andrew Svetlov  added the comment:

as_completed() is low-level API.
Let's not overload it with different parameters.

Anyway `as_completed()` uses only asyncio.Future and it's public API like 
`add_done_callback()` etc.

You can master everything what you need without asyncio modification.

Let's close the issue with "wont fix" resolution.

--
nosy: +asvetlov

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2017-08-17 Thread Andy Balaam

Andy Balaam added the comment:

bump

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2017-06-26 Thread Andy Balaam

Changes by Andy Balaam :


--
pull_requests: +2475

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2017-06-26 Thread Andy Balaam

New submission from Andy Balaam:

asyncio.as_completed allows us to provide lots of coroutines (or Futures) to 
schedule, and then deal with the results as soon as they are available, in a 
loop, or a streaming style.

I propose to allow as_completed to work on very large numbers of coroutines, 
provided through a generator (rather than a list).  In order to make this 
practical, we need to limit the number of coroutines that are scheduled 
simultaneously to a reasonable number.

For tasks that open files or sockets, a reasonable number might be 1000 or 
fewer.  For other tasks, a much larger number might be reasonable, but we would 
still like some limit to prevent us running out of memory.

I suggest adding a "limit" argument to as_completed that limits the number of 
coroutines that it schedules simultaneously.

For me, the key advantage of as_completed (in the proposed modified form) is 
that it enables a streaming style that looks quite like synchronous code, but 
is efficient in terms of memory usage (as you'd expect from a streaming style):


#!/usr/bin/env python3

import asyncio
import sys

limit = int(sys.argv[1])

async def double(x):
await asyncio.sleep(1)
return x * 2

async def print_doubles():
coros = (double(x) for x in range(100))
for res in asyncio.as_completed(coros, limit=limit):
r = await res
if r % 10 == 0:
print(r)

loop = asyncio.get_event_loop()
loop.run_until_complete(print_doubles())
loop.close()


Using my prototype implementation, this runs faster and uses much less memory 
on my machine when you run it with a limit of 100K instead of 1 million 
concurrent tasks:

$ /usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" ./example 
100
Memory usage: 2234552KB Time: 97.52 seconds

$ /usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" ./example 10
Memory usage: 252732KB  Time: 94.13 seconds

I have been working on an implementation and there is some discussion in my 
blog posts: 
http://www.artificialworlds.net/blog/2017/06/12/making-100-million-requests-with-python-aiohttp/
 and 
http://www.artificialworlds.net/blog/2017/06/27/adding-a-concurrency-limit-to-pythons-asyncio-as_completed/

Possibly the most controversial thing about this proposal is the fact that we 
need to allow passing a generator to as_completed instead of enforcing that it 
be a list.  This is fundamental to allowing the style I outlined above, but 
it's possible that we can do better than the blanket allowing of all generators 
that I did.

--
components: asyncio
messages: 296982
nosy: andybalaam, yselivanov
priority: normal
severity: normal
status: open
title: Allow limiting the number of concurrent tasks in asyncio.as_completed
type: enhancement
versions: Python 3.7

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com