I believe that clearTimer has been a feature request before though.

On Wed, Sep 19, 2018 at 11:31 AM Lukasz Cwik <[email protected]> wrote:

> *How does modelling a timer as a PCollection help the Beam model?*
>
> The largest concern was about how to model timers within Apache Beam that:
> 1) removed the need for the watermark hold that is typically accompanied
> with state/timer implementations
> 2) enabled the ability to set the explicit output time to be independent
> of the firing time for all timer specifications [1]
>
> I felt as though treating timers as a self-loop around the ParDo
> PTransform allowed us to use the natural definition of output watermark =
> min(all input watermarks) as a way to define how timers hold output and
> using windowed values that contained timers as a natural way to represent
> the output time to be independent of the firing time. The purpose of the
> PCollection right now is to store the representation of how timers are
> encoded. I suspect that at some point in time we will have different timer
> encodings.
>
> Having this fit well with how timers are delivered between the SDK and
> Runner was an added bonus. Also, a good portion of the code that I needed
> to fix up was more related to the assumption that there was ever only a
> single input producer to an executable stage and plumbing of timer
> specifications through all the runner library support layers.
>
> ----------
> *There is no "clear" for timers.*
>
> The current Java API for timers only allows you to set them. Clearing
> timers is not exposed to users and is only used by internal implementations
> to support runners[2] via TimerInternals. Usage of a timer is like so:
>   @TimerId("timer")
>   private final TimerSpec timerSpec =
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
>   @ProcessElement
>   public void process(
>       ProcessContext context,
>       BoundedWindow window,
>       @TimerId("timer") Timer myTimer) {
>
>     myTimer.set(window.maxTimestamp().plus(allowedLateness));
>   }
>
> ---------
> I'm not a big fan of having timers as a separate field in the elements
> proto. I still think they should be treated as an input/output and we could
> update the representation so that inputs/outputs for PTransforms don't need
> to be "PCollections". I was thinking that our current PCollection
> representation assumes that we'll never want to change it to add extra
> information or do backwards incompatible changes like beam:pcollection:v2.
>
> ---------
> Other points:
> * side inputs already require a runner to introspect the ParDo payload to
> get the SideInputSpec, requiring it to have knowledge of the TimerSpec is
> no different.
> * multimap side input over timers where the key is the key that the timer
> is associated with. iterable side input over timers would allow you to
> iterate over <key, timer> pairs. This could be useful for skew control in
> sources since they would want to know how far they are ahead vs other
> restrictions.
> * user state as a PCollection can make sense but I can't see how we can
> get past problems when we treat it as an "input" since the input watermark
> would be ignored or infinity?. I do agree that this could open the door to
> sharing "state" such as multi-key transactions but very speculative as you
> say.
>
>
> 1: https://issues.apache.org/jira/browse/BEAM-2535
> 2:
> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22
>
> On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise <[email protected]> wrote:
>
>> Robert,
>>
>> Thanks for presenting these thoughts. Your attempt to implement the timer
>> support in the Python runner is the first strong signal we have and it is
>> the right time to make changes - AFAIK no other runner work has been done.
>>
>> I'm also a bit concerned about the acrobatics required in the PR to make
>> this work. Luke will be in the best position to comment, but as I recall we
>> considered modeling timers as special PCollections a simplification for SDK
>> <> Runner interaction and overall implementation. The special treatment
>> (and slight confusion) at the graph level perhaps was an early warning
>> sign, discovering the extra complexity wiring this in a runner should be a
>> reason to revisit.
>>
>> Conceptually timers are special state, they are certainly more state than
>> stream :) Regardless how they are passed to the harness, the runner will
>> need to treat them similar to side inputs and user state.
>>
>> Thanks,
>> Thomas
>>
>>
>>
>>
>> On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> TLDR Perhaps we should revisit
>>> https://s.apache.org/beam-portability-timers in light of the fact that
>>> Timers are more like State than PCollections.
>>>
>>> --
>>>
>>> While looking at implementing State and Timers in the Python SDK, I've
>>> been revisiting the ideas presented at
>>> https://s.apache.org/beam-portability-timers , and am now starting to
>>> wonder if this is actually the best way to model things (at least at the
>>> Runner level). Instead it seems Timers are more resemble, and are tightly
>>> bound to, State than PCollections.
>>>
>>> This is especially clear when writing timers. These timers are not a bag
>>> of emitted elements, rather one sets (and clears) timers and the set of
>>> timers that end up firing are a result of this *ordered* sequence of
>>> operations. It is also often important that the setting of timers be
>>> ordered with respect to the setting and clearing of state itself (and is
>>> more often than not collocated with such requests).
>>>
>>> In addition, these self-loops add complexity to the graph but provide no
>>> additional information--they are entirely redundant with the timerspecs
>>> already present on DoFns. Generally I prefer less redundancy in the spec,
>>> rather than have it be over-constrained. It's unclear what a runner that
>>> didn't introspect the DoFn's TimerSpecs would do with this these special
>>> edges, and also unclear how they would differ from possible self-loops due
>>> to more traditional iteration.
>>>
>>> The primary motivation to express timers in this way seems to be the
>>> desire to push them to workers using the data plan, rather than inventing
>>> another mechanism or making them pull-based like with state. I think this
>>> could be done by simply adding a Timer field to the Elements (or Data)
>>> proto. (Note that this is not the same as having an hacky ElementOrTimer
>>> elements flow through the graph.) Writes would be state requests, and
>>> perhaps it would even make sense to "read" the current value of an unfired
>>> timer over the state API, to be able to set things like
>>> {min,max}(new_timestamp,old_timestamp}.
>>>
>>> (We could alternatively attempt to model State(s) as a PCollection(s),
>>> but this is more speculative and would likely exacerbate some of the issues
>>> above (though it could open the door for DoFns that somehow *share* state).
>>> They seem like different objects though, one is a mutable store, the other
>>> an immutable stream.)
>>>
>>> I realize this is a big shift, but we could probably adapt the existing
>>> Python/Java implementations fairly easily (and it would probably simplify
>>> them). And it's easier to do simplifications like this sooner rather than
>>> later.
>>>
>>> What do people think about this? Any obvious (or not-so-obvious)
>>> downsides that I'm missing?
>>>
>>> - Robert
>>>
>>>

Reply via email to