Hi, Futures are really nice for one-off events, in that I can just arbitrarily "yield from" or "await" a future in the middle of a coroutine.
Is there an pattern to do something similar for recurring similar events? The closest pattern of which I'm aware is a class with callbacks for each type of event (as used for example in a Transport or Protocol). And another pattern is a register callback mechanism. But both seem to lead to some convulated code in some cases for me, with the need to store state in class instance attributes whereas in a coroutine the code path could carry the necessary state instead. So what can I do to get direct access to a series of events in turn from the middle of a coroutine that I can just "await"? I ended up implementing something to do this that should hopefully explain what I mean. My question is: is there a better way, or if not, what improvements could I make? There are some hacks in my abstractions that I don't particularly like. Let me start with use, then I'll show implementation. I would like to get an iterator that produces futures which complete in turn, one for each event: @asyncio.coroutine def foo(): for future in get_something_happened_events(): event = yield from future # handle the event The reason I want this is because then I can handle logic for multiple events at once, too: @asyncio.coroutine def bar(): a_type_events = iter(...) b_type_events = iter(...) next_a_event = next(a_type_events) next_b_event = next(b_type_events) asyncio.wait([next_a_event, next_b_event], timeout=...) if next_a_event.done(): ... next_a_event = next(a_type_events) I hope this summary explains why this is useful to me, but if not I have a concrete example further below. Let me go deeper. My implementation of this is: class _FutureLinkedListItem: def __init__(self): self.future = asyncio.Future() self.next = None class EventQueue: def __init__(self): self._next_future_item = _FutureLinkedListItem() def __iter__(self): return _EventQueueReader(self) def _get_next_future_item(self, item): if item.next is None: item.next = _FutureLinkedListItem() return item.next def _move_along(self): self._next_future_item = self._get_next_future_item(self._next_future_item) def send_result(self, result): self._next_future_item.future.set_result(result) self._move_along() def send_exception(self, exception): self._next_future_item.future.set_exception(exception) self._move_along() class _EventQueueReader: def __init__(self, parent): self.parent = parent self._next_future_item = parent._next_future_item def _move_along(self): self._next_future_item = self.parent._get_next_future_item(self._next_future_item) def __iter__(self): return self def __next__(self): future = self._next_future_item.future self._move_along() return future Now I can just create an EventQueue as a class attribute. Anything can send events into that queue using send_result() or send_exception(). Anything can listen for events by iter() on the EventQueue instance returning my series of Futures. Multiple listeners see the same set of Futures. Old Futures no longer referenced, even "wasted" ones, just get garbage collected. Listeners can ask for Futures for the next ten events at once, and they will be instantiated on the spot and will eventually fire. But in the common case only one non-done Future exists at one time for the next event. I found that I needed to create long running "event processing" coroutines that listen for events and generate others. So I wrote a simple Worker class to track long lived coroutines, cancelling them if garbage collected. I've put all the code in https://gist.github.com/basak/c0a8f4f5a51d7fce1cc7, including the Worker implementation, etc. I have two concrete cases where I think this came in handy, which is why I'm seeking best practice on this. My second concrete example is a prototype XMPP client I'm using where there are various coroutines running during protocol negotation etc, but I'll save that for another time. First is a case where I need to listen on system power events as well as time out on them. On my Ubuntu phone, when the system is asleep time doesn't pass as far as poll(2) etc are concerned, so asyncio.sleep() isn't sufficient for a wall clock based time interval wait. For example if the phone is on battery and is not being used, a sleep of 60 seconds may not complete for hours as the phone wakes only occasinoally and for only a few milliseconds at a time. So I have to augment a sleep by listening for system power events so that after a wakeup I can check the RTC to see if more time has passed because the system was asleep. First, here's the class that watches power state. There's a program that will output state changes to stdout, so I just pick up on that and publish events to a public EventQueue attribute. This uses a coroutine that will get cancelled by the Worker if the class instance is garbage collected: class PowerStateWatcher: def __init__(self, loop=None): self.loop = loop or asyncio.get_event_loop() self.power_events = util.EventQueue() self._worker = util.Worker(loop=loop) self._worker.create_task(self.watch_wakeups()) @asyncio.coroutine def watch_wakeups(self): create = asyncio.create_subprocess_exec('stdbuf', '-o0', 'powerd-cli', 'listen', stdout=subprocess.PIPE) proc = yield from create try: while True: line = yield from proc.stdout.readline() if line == b"Received SysPowerStateChange: state=0\n": self.power_events.send_result(0) elif line == b"Received SysPowerStateChange: state=1\n": self.power_events.send_result(1) finally: proc.kill() Next, a class that creates events on a regular pulse that others can listen on by watching the EventQueue public instance attribute in the same way: class Clock: def __init__(self, period, loop=None): self.loop = loop or asyncio.get_event_loop() self.trigger_events = util.EventQueue() self._worker = util.Worker(loop=loop) self._worker.create_task(self._watch(period)) @asyncio.coroutine def _watch(self, period): watcher = PowerStateWatcher(self.loop) power_events = iter(watcher.power_events) future_power_event = next(power_events) deadline = time.time() + period while True: time_remaining = deadline - time.time() if time_remaining < 0: self.trigger_events.send_result(-time_remaining) deadline = time.time() + period else: yield from asyncio.wait( [future_power_event], timeout=time_remaining, loop=self.loop, ) if future_power_event.done(): future_power_event = next(power_events) Finally, I can just use this from a coroutine as follows: @asyncio.coroutine def create_periodic_reports(loop): clock = Clock(period=300, loop=loop) for future_trigger in clock.trigger_events: tardiness = yield from future_trigger # DO SOMETHING HERE Before I did it this way, I had a pretty convoluted class with a method that got called on a power state change event that then had to examine state stored in the class manipulated by other methods. Moving the logic into a single coroutine seemed to make things simpler to me. I have just noticed that I have classes each with two methods, one of which is __init__. So I could turn them into plain coroutines now, which is nice. I refactored extensively to get to where I am now so that's how I ended up here. I won't try and do it while drafting this email though as I don't have an easy way to test it will work. Thank you for reading this far! So what do you think? Am I insane, or is this a sensible way to arrange things?