Clear can be modeled by a boolean state cell of process/ignore next timer firing. Not as good for watermark advancement though if you can eagerly clear something.
Longer term you could retract a "timer" from a PCollection once retractions are supported. On Wed, Sep 19, 2018 at 11:40 AM Reuven Lax <re...@google.com> wrote: > I believe that clearTimer has been a feature request before though. > > On Wed, Sep 19, 2018 at 11:31 AM Lukasz Cwik <lc...@google.com> wrote: > >> *How does modelling a timer as a PCollection help the Beam model?* >> >> The largest concern was about how to model timers within Apache Beam that: >> 1) removed the need for the watermark hold that is typically accompanied >> with state/timer implementations >> 2) enabled the ability to set the explicit output time to be independent >> of the firing time for all timer specifications [1] >> >> I felt as though treating timers as a self-loop around the ParDo >> PTransform allowed us to use the natural definition of output watermark = >> min(all input watermarks) as a way to define how timers hold output and >> using windowed values that contained timers as a natural way to represent >> the output time to be independent of the firing time. The purpose of the >> PCollection right now is to store the representation of how timers are >> encoded. I suspect that at some point in time we will have different timer >> encodings. >> >> Having this fit well with how timers are delivered between the SDK and >> Runner was an added bonus. Also, a good portion of the code that I needed >> to fix up was more related to the assumption that there was ever only a >> single input producer to an executable stage and plumbing of timer >> specifications through all the runner library support layers. >> >> ---------- >> *There is no "clear" for timers.* >> >> The current Java API for timers only allows you to set them. Clearing >> timers is not exposed to users and is only used by internal implementations >> to support runners[2] via TimerInternals. Usage of a timer is like so: >> @TimerId("timer") >> private final TimerSpec timerSpec = >> TimerSpecs.timer(TimeDomain.EVENT_TIME); >> >> @ProcessElement >> public void process( >> ProcessContext context, >> BoundedWindow window, >> @TimerId("timer") Timer myTimer) { >> >> myTimer.set(window.maxTimestamp().plus(allowedLateness)); >> } >> >> --------- >> I'm not a big fan of having timers as a separate field in the elements >> proto. I still think they should be treated as an input/output and we could >> update the representation so that inputs/outputs for PTransforms don't need >> to be "PCollections". I was thinking that our current PCollection >> representation assumes that we'll never want to change it to add extra >> information or do backwards incompatible changes like beam:pcollection:v2. >> >> --------- >> Other points: >> * side inputs already require a runner to introspect the ParDo payload to >> get the SideInputSpec, requiring it to have knowledge of the TimerSpec is >> no different. >> * multimap side input over timers where the key is the key that the timer >> is associated with. iterable side input over timers would allow you to >> iterate over <key, timer> pairs. This could be useful for skew control in >> sources since they would want to know how far they are ahead vs other >> restrictions. >> * user state as a PCollection can make sense but I can't see how we can >> get past problems when we treat it as an "input" since the input watermark >> would be ignored or infinity?. I do agree that this could open the door to >> sharing "state" such as multi-key transactions but very speculative as you >> say. >> >> >> 1: https://issues.apache.org/jira/browse/BEAM-2535 >> 2: >> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22 >> >> On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise <t...@apache.org> wrote: >> >>> Robert, >>> >>> Thanks for presenting these thoughts. Your attempt to implement the >>> timer support in the Python runner is the first strong signal we have and >>> it is the right time to make changes - AFAIK no other runner work has been >>> done. >>> >>> I'm also a bit concerned about the acrobatics required in the PR to make >>> this work. Luke will be in the best position to comment, but as I recall we >>> considered modeling timers as special PCollections a simplification for SDK >>> <> Runner interaction and overall implementation. The special treatment >>> (and slight confusion) at the graph level perhaps was an early warning >>> sign, discovering the extra complexity wiring this in a runner should be a >>> reason to revisit. >>> >>> Conceptually timers are special state, they are certainly more state >>> than stream :) Regardless how they are passed to the harness, the runner >>> will need to treat them similar to side inputs and user state. >>> >>> Thanks, >>> Thomas >>> >>> >>> >>> >>> On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> TLDR Perhaps we should revisit >>>> https://s.apache.org/beam-portability-timers in light of the fact that >>>> Timers are more like State than PCollections. >>>> >>>> -- >>>> >>>> While looking at implementing State and Timers in the Python SDK, I've >>>> been revisiting the ideas presented at >>>> https://s.apache.org/beam-portability-timers , and am now starting to >>>> wonder if this is actually the best way to model things (at least at the >>>> Runner level). Instead it seems Timers are more resemble, and are tightly >>>> bound to, State than PCollections. >>>> >>>> This is especially clear when writing timers. These timers are not a >>>> bag of emitted elements, rather one sets (and clears) timers and the set of >>>> timers that end up firing are a result of this *ordered* sequence of >>>> operations. It is also often important that the setting of timers be >>>> ordered with respect to the setting and clearing of state itself (and is >>>> more often than not collocated with such requests). >>>> >>>> In addition, these self-loops add complexity to the graph but provide >>>> no additional information--they are entirely redundant with the timerspecs >>>> already present on DoFns. Generally I prefer less redundancy in the spec, >>>> rather than have it be over-constrained. It's unclear what a runner that >>>> didn't introspect the DoFn's TimerSpecs would do with this these special >>>> edges, and also unclear how they would differ from possible self-loops due >>>> to more traditional iteration. >>>> >>>> The primary motivation to express timers in this way seems to be the >>>> desire to push them to workers using the data plan, rather than inventing >>>> another mechanism or making them pull-based like with state. I think this >>>> could be done by simply adding a Timer field to the Elements (or Data) >>>> proto. (Note that this is not the same as having an hacky ElementOrTimer >>>> elements flow through the graph.) Writes would be state requests, and >>>> perhaps it would even make sense to "read" the current value of an unfired >>>> timer over the state API, to be able to set things like >>>> {min,max}(new_timestamp,old_timestamp}. >>>> >>>> (We could alternatively attempt to model State(s) as a PCollection(s), >>>> but this is more speculative and would likely exacerbate some of the issues >>>> above (though it could open the door for DoFns that somehow *share* state). >>>> They seem like different objects though, one is a mutable store, the other >>>> an immutable stream.) >>>> >>>> I realize this is a big shift, but we could probably adapt the existing >>>> Python/Java implementations fairly easily (and it would probably simplify >>>> them). And it's easier to do simplifications like this sooner rather than >>>> later. >>>> >>>> What do people think about this? Any obvious (or not-so-obvious) >>>> downsides that I'm missing? >>>> >>>> - Robert >>>> >>>>