Robert, Thanks for presenting these thoughts. Your attempt to implement the timer support in the Python runner is the first strong signal we have and it is the right time to make changes - AFAIK no other runner work has been done.
I'm also a bit concerned about the acrobatics required in the PR to make this work. Luke will be in the best position to comment, but as I recall we considered modeling timers as special PCollections a simplification for SDK <> Runner interaction and overall implementation. The special treatment (and slight confusion) at the graph level perhaps was an early warning sign, discovering the extra complexity wiring this in a runner should be a reason to revisit. Conceptually timers are special state, they are certainly more state than stream :) Regardless how they are passed to the harness, the runner will need to treat them similar to side inputs and user state. Thanks, Thomas On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw <[email protected]> wrote: > TLDR Perhaps we should revisit > https://s.apache.org/beam-portability-timers in light of the fact that > Timers are more like State than PCollections. > > -- > > While looking at implementing State and Timers in the Python SDK, I've > been revisiting the ideas presented at > https://s.apache.org/beam-portability-timers , and am now starting to > wonder if this is actually the best way to model things (at least at the > Runner level). Instead it seems Timers are more resemble, and are tightly > bound to, State than PCollections. > > This is especially clear when writing timers. These timers are not a bag > of emitted elements, rather one sets (and clears) timers and the set of > timers that end up firing are a result of this *ordered* sequence of > operations. It is also often important that the setting of timers be > ordered with respect to the setting and clearing of state itself (and is > more often than not collocated with such requests). > > In addition, these self-loops add complexity to the graph but provide no > additional information--they are entirely redundant with the timerspecs > already present on DoFns. Generally I prefer less redundancy in the spec, > rather than have it be over-constrained. It's unclear what a runner that > didn't introspect the DoFn's TimerSpecs would do with this these special > edges, and also unclear how they would differ from possible self-loops due > to more traditional iteration. > > The primary motivation to express timers in this way seems to be the > desire to push them to workers using the data plan, rather than inventing > another mechanism or making them pull-based like with state. I think this > could be done by simply adding a Timer field to the Elements (or Data) > proto. (Note that this is not the same as having an hacky ElementOrTimer > elements flow through the graph.) Writes would be state requests, and > perhaps it would even make sense to "read" the current value of an unfired > timer over the state API, to be able to set things like > {min,max}(new_timestamp,old_timestamp}. > > (We could alternatively attempt to model State(s) as a PCollection(s), but > this is more speculative and would likely exacerbate some of the issues > above (though it could open the door for DoFns that somehow *share* state). > They seem like different objects though, one is a mutable store, the other > an immutable stream.) > > I realize this is a big shift, but we could probably adapt the existing > Python/Java implementations fairly easily (and it would probably simplify > them). And it's easier to do simplifications like this sooner rather than > later. > > What do people think about this? Any obvious (or not-so-obvious) downsides > that I'm missing? > > - Robert > >
