Re: [Python-ideas] asyncio: return from multiple coroutines

2020-06-25 Thread Kyle Stanley
(Resending this email since it didn't originally go through to
python-list, sorry for the duplicate Pablo)

> Yes, I want to have multiple results: the connections listening forever, 
> returning a result for each message received.

> I forgot to mention thatI did try to use asyncio.wait with `FIRST_COMPLETED`; 
> however, the problem is that it seems to evict the not-completed coroutines, 
> so the messenger that arrives second does not send the message. To check it, 
> I have run that script without the random sleep. just msgr1 waits 1s and 
> msgr2 waits 2s, so msgr1 always ends first. I expect a result like this 
> (which I am currently getting with queues):

FYI, the other coroutines are not evicted or cancelled, they are
simply in the "pending" set of the "done, pending" tuple returned by
`asyncio.wait()` and were not returned. I misunderstood what you were
actually looking for, but I think that I understand now.

Since it seems like you want to be able to receive the results out of
order from both "messengers" and have them continuously listen to
their respective socket (without affecting the other), a queue is
likely going to be the best approach. I think you had the right
general idea with your example, but here's a way to make it
significantly less cumbersome and easy to expand upon (such as
expanding the number of websockets to listen to):

```
import asyncio

class MessengerQueue:
def __init__(self):
self._queue = asyncio.Queue()

async def get(self):
return await self._queue.get()

async def start_messengers(self):
# Could be easily modified to create any number of
"messengers" set to
# listen on specific websockets, by passing a list and
creating a task for
# each one.
asyncio.create_task(self._messenger("Messenger 1", 1))
asyncio.create_task(self._messender("Messenger 2", 2))

async def _messenger(self, message: str, sleep_time: int):
while True:
await asyncio.sleep(sleep_time)
await self._queue.put(f'{message} awaited for {sleep_time:.2f}s')


async def main():
mqueue = MessengerQueue()
asyncio.create_task(mqueue.start_messengers())
while True:
result = await mqueue.get()
print(result)

asyncio.run(main())
```

This results in your desired output:
Messenger 1 awaited for 1.00s
Messenger 2 awaited for 2.00s
Messenger 1 awaited for 1.00s
Messenger 1 awaited for 1.00s
Messenger 2 awaited for 2.00s
Messenger 1 awaited for 1.00s
Messenger 1 awaited for 1.00s
Messenger 2 awaited for 2.00s

Note: it would probably be more idiomatic to call these "consumers" or
"listeners" rather than "messengers"/"messagers" (the websocket docs
refer to them as "consumer handlers"), but I used "messengers" to make
it a bit more easily comparable to the original queue example from the
OP: https://pastebin.com/BzaxRbtF.

I hope the above example is of some use. :-)

Regards,
Kyle Stanley

On Thu, Jun 25, 2020 at 1:28 AM Kyle Stanley  wrote:
>
> > Yes, I want to have multiple results: the connections listening forever, 
> > returning a result for each message received.
>
> > I forgot to mention thatI did try to use asyncio.wait with 
> > `FIRST_COMPLETED`; however, the problem is that it seems to evict the 
> > not-completed coroutines, so the messenger that arrives second does not 
> > send the message. To check it, I have run that script without the random 
> > sleep. just msgr1 waits 1s and msgr2 waits 2s, so msgr1 always ends first. 
> > I expect a result like this (which I am currently getting with queues):
>
> FYI, the other coroutines are not evicted or cancelled, they are
> simply in the "pending" set of the "done, pending" tuple returned by
> `asyncio.wait()` and were not returned. I misunderstood what you were
> actually looking for, but I think that I understand now.
>
> Since it seems like you want to be able to receive the results out of
> order from both "messengers" and have them continuously listen to
> their respective socket (without affecting the other), a queue is
> likely going to be the best approach. I think you had the right
> general idea with your example, but here's a way to make it
> significantly less cumbersome and easy to expand upon (such as
> expanding the number of websockets to listen to):
>
> ```
> import asyncio
>
> class MessengerQueue:
> def __init__(self):
> self._queue = asyncio.Queue()
>
> async def get(self):
> return await self._queue.get()
>
> async def start_messengers(self):
> # Could be easily modified to create any number of
> "messengers" set to
> # listen on specific websockets, by passing a list and
> creating a task for
> # each one.
> asyncio.create_task(self._messenger("Messenger 1", 1))
> asyncio.create_task(self._messender("Messenger 2", 2))
>
> async def _messenger(self, message: str, sleep_time: int):
> while True:
> await 

Re: [Python-ideas] asyncio: return from multiple coroutines

2020-06-23 Thread Pablo Alcain
Thank you very much Kyle for your answer, I am moving this conversation to
the more proper python-list for whoever wants to chime in. I summarize here
the key points of my original question (full question on the quoted email):

I have an application that listens on two websockets through the async
library https://websockets.readthedocs.io/ and I have to perform the same
function on the result, no matter where the message came from. I have
implemented a rather cumbersome solution with async Queues:
https://pastebin.com/BzaxRbtF, but i think there has to be a more
async-friendly option I am missing.

Now I move on to the comments that Kyle made

On Tue, Jun 23, 2020 at 12:32 AM Kyle Stanley  wrote:

> I believe asyncio.wait() with "return_when=FIRST_COMPLETED" would
> perform the functionality you're looking for with the
> "asyncio.on_first_return()". For details on the functionality of
> asyncio.wait(), see
> https://docs.python.org/3/library/asyncio-task.html#asyncio.wait.
>
> I understand that I can create two coroutines that call the same
> function, but it would be much cleaner (because of implementation issues)
> if I can simply create a coroutine that yields the result of whichever
> connection arrives first.
>
> You can use an asynchronous generator that will continuously yield the
> result of the first recv() that finishes (I'm assuming you mean
> "yields" literally and want multiple results from a generator, but I
> might be misinterpreting that part).
>

Yes, I want to have multiple results: the connections listening forever,
returning a result for each message received.


>
> Here's a brief example, using the recv() coroutine function from the
> pastebin linked:
>
> ```
> import asyncio
> import random
>
> async def recv(message: str, max_sleep: int):
> sleep_time = max_sleep * random.random()
> await asyncio.sleep(sleep_time)
> return f'{message} awaited for {sleep_time:.2f}s'
>
> async def _start():
> while True:
> msgs = [
> asyncio.create_task(recv("Messager 1", max_sleep=1)),
> asyncio.create_task(recv("Messager 2", max_sleep=1))
> ]
> done, _ = await asyncio.wait(msgs,
> return_when=asyncio.FIRST_COMPLETED)
> result = done.pop()
> yield await result
>
> async def main():
> async for result in _start():
> print(result)
>
> asyncio.run(main())
> ```


I forgot to mention thatI did try to use asyncio.wait with
`FIRST_COMPLETED`; however, the problem is that it seems to evict the
not-completed coroutines, so the messenger that arrives second does not
send the message. To check it, I have run that script without the random
sleep. just msgr1 waits 1s and msgr2 waits 2s, so msgr1 always ends first.
I expect a result like this (which I am currently getting with queues):

Messenger 1 waits for 1.0s
Messenger 1 waits for 1.0s
Messenger 2 waits for 2.0s
Messenger 1 waits for 1.0s
Messenger 1 waits for 1.0s
Messenger 2 waits for 2.0s
Messenger 1 waits for 1.0s
...

but instead I got this:

Messenger 1 waits for 1.0s
Messenger 1 waits for 1.0s
Messenger 1 waits for 1.0s
Messenger 1 waits for 1.0s
Messenger 1 waits for 1.0s
...





> Note that in the above example, in "msgs", you can technically pass
> the coroutine objects directly to asyncio.wait(), as they will be
> implicitly converted to tasks. However, we decided to deprecate that
> functionality in Python 3.8 since it can be rather confusing. So
> creating and passing the tasks is a better practice.
>

Thanks for that info, I am still trying to grasp the best practices
surrounding mostly the explicitness in async.


> > Again, it's quite likely I am not seeing something obvious, but I didn't
> know where else to ask.
>
> If you're not mostly certain or relatively inexperienced with the
> specific area that the question pertains to, I'd recommend asking on
> python-list first (or another Python user community). python-ideas is
> primarily intended for new feature proposals/suggestions. Although if
> you've tried other resources and haven't found an answer, it's
> perfectly fine to ask a question as part of the suggestion post.
>
>
>
Original question, as posted in python-ideas:


> On Mon, Jun 22, 2020 at 6:24 PM Pablo Alcain 
> wrote:
> >
> > Hey everyone. I have been looking into asyncio lately, and even though I
> have had my fair share of work, I still have some of it very shaky, so
> first of all forgive me if what I am saying here is already implemented and
> I totally missed it (so far, it looks *really* likely).
> >
> > Basically this is the situation: I have an application that listens on
> two websockets through the async library
> https://websockets.readthedocs.io/ and I have to perform the same
> function on the result, no matter where the message came from. I understand
> that I can create two coroutines that call the same function, but it would
> be much cleaner (because of implementation issues) if I can simply create a
>