Hi Robie,

This all looks very well put together; I don't see any obvious shortcuts in
your code.

I was at first tempted to recommend using the asyncio.Queue class, but it
seems you really want to be able to have multiple consumers of the same
stream of events. I notice that your Event* classes don't contain any
coroutines, but you can't use a collections.deque because you want it to be
an infinite iterator. I guess a consumer that doesn't wait for the Futures
would be in trouble, but you seem to be okay with that.

If you are using Python 3.5 you might try changing things around to use PEP
492's `async for`; this should let you write

    async for result in <queue>:
        ...use result...

rather than

    for future in <queue>:
        result = yield from future  # or await future
        ...use result...

but it's not as easy to recover when a future has an exception instead of a
result, and still continue the loop, which also seems to be one of your
requirements.

So I think you've arrived at a very reasonable way to factor your code
given all your constraints.

One last idea: Perhaps you could try to rethink the API in a completely
different way, not using infinite iterators? I was thinking of using an
asyncio.Queue instead of the iterator, where you can block on its .get()
method whenever you want to wait for the next event (this is an awaitable,
so you can combine waiting for multiple events, timeouts etc.). The ability
to have multiple consumers would then have to be supported directly by the
producer, e.g. using a simple registration interface, which would take the
role of EventQueue.__iter__() in your current design. The producer could
maintain a weakref.WeakSet of queues connecting it to consumers, so a
consumer that disappears (or drops its queue) simply disappears from the
set.

The one feature this design doesn't support is passing exceptions through
the queue -- you'd have to invent some kludge using a different type of
queue value that the consumer must check for if you need this. (Though your
examples don't seem to use send_exception(), so maybe it's a case of
over-generalization? I didn't read your more elaborate example though.)

Whatever you decide, good  luck with the further development of this code!

--Guido



On Tue, Dec 29, 2015 at 5:31 PM, Robie Basak <ro...@justgohome.co.uk> wrote:

> Hi,
>
> Futures are really nice for one-off events, in that I can just
> arbitrarily "yield from" or "await" a future in the middle of a
> coroutine.
>
> Is there an pattern to do something similar for recurring similar
> events?
>
> The closest pattern of which I'm aware is a class with callbacks for
> each type of event (as used for example in a Transport or Protocol). And
> another pattern is a register callback mechanism. But both seem to lead
> to some convulated code in some cases for me, with the need to store
> state in class instance attributes whereas in a coroutine the code path
> could carry the necessary state instead.
>
> So what can I do to get direct access to a series of events in turn from
> the middle of a coroutine that I can just "await"?
>
> I ended up implementing something to do this that should hopefully
> explain what I mean. My question is: is there a better way, or if not,
> what improvements could I make? There are some hacks in my abstractions
> that I don't particularly like.
>
> Let me start with use, then I'll show implementation. I would like to
> get an iterator that produces futures which complete in turn, one for
> each event:
>
> @asyncio.coroutine
> def foo():
>     for future in get_something_happened_events():
>         event = yield from future
>         # handle the event
>
> The reason I want this is because then I can handle logic for multiple
> events at once, too:
>
> @asyncio.coroutine
> def bar():
>     a_type_events = iter(...)
>     b_type_events = iter(...)
>
>     next_a_event = next(a_type_events)
>     next_b_event = next(b_type_events)
>
>     asyncio.wait([next_a_event, next_b_event], timeout=...)
>     if next_a_event.done():
>        ...
>        next_a_event = next(a_type_events)
>
> I hope this summary explains why this is useful to me, but if not I have
> a concrete example further below.
>
> Let me go deeper.
>
> My implementation of this is:
>
> class _FutureLinkedListItem:
>     def __init__(self):
>         self.future = asyncio.Future()
>         self.next = None
>
> class EventQueue:
>     def __init__(self):
>         self._next_future_item = _FutureLinkedListItem()
>
>     def __iter__(self):
>         return _EventQueueReader(self)
>
>     def _get_next_future_item(self, item):
>         if item.next is None:
>             item.next = _FutureLinkedListItem()
>         return item.next
>
>     def _move_along(self):
>         self._next_future_item =
> self._get_next_future_item(self._next_future_item)
>
>     def send_result(self, result):
>         self._next_future_item.future.set_result(result)
>         self._move_along()
>
>     def send_exception(self, exception):
>         self._next_future_item.future.set_exception(exception)
>         self._move_along()
>
> class _EventQueueReader:
>     def __init__(self, parent):
>         self.parent = parent
>         self._next_future_item = parent._next_future_item
>
>     def _move_along(self):
>         self._next_future_item =
> self.parent._get_next_future_item(self._next_future_item)
>
>     def __iter__(self):
>         return self
>
>     def __next__(self):
>         future = self._next_future_item.future
>         self._move_along()
>         return future
>
>
> Now I can just create an EventQueue as a class attribute. Anything can
> send events into that queue using send_result() or send_exception().
> Anything can listen for events by iter() on the EventQueue instance
> returning my series of Futures. Multiple listeners see the same set of
> Futures. Old Futures no longer referenced, even "wasted" ones, just get
> garbage collected. Listeners can ask for Futures for the next ten events
> at once, and they will be instantiated on the spot and will eventually
> fire. But in the common case only one non-done Future exists at one time
> for the next event.
>
> I found that I needed to create long running "event processing"
> coroutines that listen for events and generate others. So I wrote a
> simple Worker class to track long lived coroutines, cancelling them if
> garbage collected.
>
> I've put all the code in
> https://gist.github.com/basak/c0a8f4f5a51d7fce1cc7, including the Worker
> implementation, etc.
>
> I have two concrete cases where I think this came in handy, which is why
> I'm seeking best practice on this. My second concrete example is a
> prototype XMPP client I'm using where there are various coroutines
> running during protocol negotation etc, but I'll save that for another
> time. First is a case where I need to listen on system power events as
> well as time out on them.
>
> On my Ubuntu phone, when the system is asleep time doesn't pass as far
> as poll(2) etc are concerned, so asyncio.sleep() isn't sufficient for a
> wall clock based time interval wait. For example if the phone is on
> battery and is not being used, a sleep of 60 seconds may not complete
> for hours as the phone wakes only occasinoally and for only a few
> milliseconds at a time. So I have to augment a sleep by listening for
> system power events so that after a wakeup I can check the RTC to see if
> more time has passed because the system was asleep.
>
> First, here's the class that watches power state. There's a program that
> will output state changes to stdout, so I just pick up on that and
> publish events to a public EventQueue attribute. This uses a coroutine
> that will get cancelled by the Worker if the class instance is garbage
> collected:
>
>
> class PowerStateWatcher:
>     def __init__(self, loop=None):
>         self.loop = loop or asyncio.get_event_loop()
>         self.power_events = util.EventQueue()
>
>         self._worker = util.Worker(loop=loop)
>         self._worker.create_task(self.watch_wakeups())
>
>     @asyncio.coroutine
>     def watch_wakeups(self):
>         create = asyncio.create_subprocess_exec('stdbuf', '-o0',
> 'powerd-cli', 'listen', stdout=subprocess.PIPE)
>         proc = yield from create
>
>         try:
>             while True:
>                 line = yield from proc.stdout.readline()
>                 if line == b"Received SysPowerStateChange: state=0\n":
>                     self.power_events.send_result(0)
>                 elif line == b"Received SysPowerStateChange: state=1\n":
>                     self.power_events.send_result(1)
>         finally:
>             proc.kill()
>
>
> Next, a class that creates events on a regular pulse that others can
> listen on by watching the EventQueue public instance attribute in the
> same way:
>
> class Clock:
>     def __init__(self, period, loop=None):
>         self.loop = loop or asyncio.get_event_loop()
>         self.trigger_events = util.EventQueue()
>
>         self._worker = util.Worker(loop=loop)
>         self._worker.create_task(self._watch(period))
>
>     @asyncio.coroutine
>     def _watch(self, period):
>         watcher = PowerStateWatcher(self.loop)
>         power_events = iter(watcher.power_events)
>         future_power_event = next(power_events)
>
>         deadline = time.time() + period
>         while True:
>             time_remaining = deadline - time.time()
>             if time_remaining < 0:
>                 self.trigger_events.send_result(-time_remaining)
>                 deadline = time.time() + period
>             else:
>                 yield from asyncio.wait(
>                     [future_power_event],
>                     timeout=time_remaining,
>                     loop=self.loop,
>                 )
>                 if future_power_event.done():
>                     future_power_event = next(power_events)
>
>
> Finally, I can just use this from a coroutine as follows:
>
> @asyncio.coroutine
> def create_periodic_reports(loop):
>     clock = Clock(period=300, loop=loop)
>     for future_trigger in clock.trigger_events:
>         tardiness = yield from future_trigger
>         # DO SOMETHING HERE
>
>
> Before I did it this way, I had a pretty convoluted class with a method
> that got called on a power state change event that then had to examine
> state stored in the class manipulated by other methods. Moving the logic
> into a single coroutine seemed to make things simpler to me.
>
> I have just noticed that I have classes each with two methods, one of
> which is __init__. So I could turn them into plain coroutines now, which
> is nice. I refactored extensively to get to where I am now so that's how
> I ended up here. I won't try and do it while drafting this email though
> as I don't have an easy way to test it will work.
>
> Thank you for reading this far! So what do you think? Am I insane, or is
> this a sensible way to arrange things?
>



-- 
--Guido van Rossum (python.org/~guido)

Reply via email to