Hi,

Futures are really nice for one-off events, in that I can just
arbitrarily "yield from" or "await" a future in the middle of a
coroutine.

Is there an pattern to do something similar for recurring similar
events?

The closest pattern of which I'm aware is a class with callbacks for
each type of event (as used for example in a Transport or Protocol). And
another pattern is a register callback mechanism. But both seem to lead
to some convulated code in some cases for me, with the need to store
state in class instance attributes whereas in a coroutine the code path
could carry the necessary state instead.

So what can I do to get direct access to a series of events in turn from
the middle of a coroutine that I can just "await"?

I ended up implementing something to do this that should hopefully
explain what I mean. My question is: is there a better way, or if not,
what improvements could I make? There are some hacks in my abstractions
that I don't particularly like.

Let me start with use, then I'll show implementation. I would like to
get an iterator that produces futures which complete in turn, one for
each event:

@asyncio.coroutine
def foo():
    for future in get_something_happened_events():
        event = yield from future
        # handle the event

The reason I want this is because then I can handle logic for multiple
events at once, too:

@asyncio.coroutine
def bar():
    a_type_events = iter(...)
    b_type_events = iter(...)

    next_a_event = next(a_type_events)
    next_b_event = next(b_type_events)

    asyncio.wait([next_a_event, next_b_event], timeout=...)
    if next_a_event.done():
       ...
       next_a_event = next(a_type_events)

I hope this summary explains why this is useful to me, but if not I have
a concrete example further below.

Let me go deeper.

My implementation of this is:

class _FutureLinkedListItem:
    def __init__(self):
        self.future = asyncio.Future()
        self.next = None

class EventQueue:
    def __init__(self):
        self._next_future_item = _FutureLinkedListItem()

    def __iter__(self):
        return _EventQueueReader(self)

    def _get_next_future_item(self, item):
        if item.next is None:
            item.next = _FutureLinkedListItem()
        return item.next

    def _move_along(self):
        self._next_future_item = 
self._get_next_future_item(self._next_future_item)

    def send_result(self, result):
        self._next_future_item.future.set_result(result)
        self._move_along()

    def send_exception(self, exception):
        self._next_future_item.future.set_exception(exception)
        self._move_along()

class _EventQueueReader:
    def __init__(self, parent):
        self.parent = parent
        self._next_future_item = parent._next_future_item

    def _move_along(self):
        self._next_future_item = 
self.parent._get_next_future_item(self._next_future_item)

    def __iter__(self):
        return self

    def __next__(self):
        future = self._next_future_item.future
        self._move_along()
        return future


Now I can just create an EventQueue as a class attribute. Anything can
send events into that queue using send_result() or send_exception().
Anything can listen for events by iter() on the EventQueue instance
returning my series of Futures. Multiple listeners see the same set of
Futures. Old Futures no longer referenced, even "wasted" ones, just get
garbage collected. Listeners can ask for Futures for the next ten events
at once, and they will be instantiated on the spot and will eventually
fire. But in the common case only one non-done Future exists at one time
for the next event.

I found that I needed to create long running "event processing"
coroutines that listen for events and generate others. So I wrote a
simple Worker class to track long lived coroutines, cancelling them if
garbage collected.

I've put all the code in
https://gist.github.com/basak/c0a8f4f5a51d7fce1cc7, including the Worker
implementation, etc.

I have two concrete cases where I think this came in handy, which is why
I'm seeking best practice on this. My second concrete example is a
prototype XMPP client I'm using where there are various coroutines
running during protocol negotation etc, but I'll save that for another
time. First is a case where I need to listen on system power events as
well as time out on them.

On my Ubuntu phone, when the system is asleep time doesn't pass as far
as poll(2) etc are concerned, so asyncio.sleep() isn't sufficient for a
wall clock based time interval wait. For example if the phone is on
battery and is not being used, a sleep of 60 seconds may not complete
for hours as the phone wakes only occasinoally and for only a few
milliseconds at a time. So I have to augment a sleep by listening for
system power events so that after a wakeup I can check the RTC to see if
more time has passed because the system was asleep.

First, here's the class that watches power state. There's a program that
will output state changes to stdout, so I just pick up on that and
publish events to a public EventQueue attribute. This uses a coroutine
that will get cancelled by the Worker if the class instance is garbage
collected:


class PowerStateWatcher:
    def __init__(self, loop=None):
        self.loop = loop or asyncio.get_event_loop()
        self.power_events = util.EventQueue()

        self._worker = util.Worker(loop=loop)
        self._worker.create_task(self.watch_wakeups())

    @asyncio.coroutine
    def watch_wakeups(self):
        create = asyncio.create_subprocess_exec('stdbuf', '-o0', 'powerd-cli', 
'listen', stdout=subprocess.PIPE)
        proc = yield from create

        try:
            while True:
                line = yield from proc.stdout.readline()
                if line == b"Received SysPowerStateChange: state=0\n":
                    self.power_events.send_result(0)
                elif line == b"Received SysPowerStateChange: state=1\n":
                    self.power_events.send_result(1)
        finally:
            proc.kill()


Next, a class that creates events on a regular pulse that others can
listen on by watching the EventQueue public instance attribute in the
same way:

class Clock:
    def __init__(self, period, loop=None):
        self.loop = loop or asyncio.get_event_loop()
        self.trigger_events = util.EventQueue()

        self._worker = util.Worker(loop=loop)
        self._worker.create_task(self._watch(period))

    @asyncio.coroutine
    def _watch(self, period):
        watcher = PowerStateWatcher(self.loop)
        power_events = iter(watcher.power_events)
        future_power_event = next(power_events)

        deadline = time.time() + period
        while True:
            time_remaining = deadline - time.time()
            if time_remaining < 0:
                self.trigger_events.send_result(-time_remaining)
                deadline = time.time() + period
            else:
                yield from asyncio.wait(
                    [future_power_event],
                    timeout=time_remaining,
                    loop=self.loop,
                )
                if future_power_event.done():
                    future_power_event = next(power_events)


Finally, I can just use this from a coroutine as follows:

@asyncio.coroutine
def create_periodic_reports(loop):
    clock = Clock(period=300, loop=loop)
    for future_trigger in clock.trigger_events:
        tardiness = yield from future_trigger
        # DO SOMETHING HERE


Before I did it this way, I had a pretty convoluted class with a method
that got called on a power state change event that then had to examine
state stored in the class manipulated by other methods. Moving the logic
into a single coroutine seemed to make things simpler to me.

I have just noticed that I have classes each with two methods, one of
which is __init__. So I could turn them into plain coroutines now, which
is nice. I refactored extensively to get to where I am now so that's how
I ended up here. I won't try and do it while drafting this email though
as I don't have an easy way to test it will work.

Thank you for reading this far! So what do you think? Am I insane, or is
this a sensible way to arrange things?

Reply via email to