On Wed, Sep 19, 2018 at 2:46 PM Robert Bradshaw <[email protected]> wrote:

> On Wed, Sep 19, 2018 at 8:31 PM Lukasz Cwik <[email protected]> wrote:
>
>> *How does modelling a timer as a PCollection help the Beam model?*
>>
>> The largest concern was about how to model timers within Apache Beam that:
>> 1) removed the need for the watermark hold that is typically accompanied
>> with state/timer implementations
>> 2) enabled the ability to set the explicit output time to be independent
>> of the firing time for all timer specifications [1]
>>
>> I felt as though treating timers as a self-loop around the ParDo
>> PTransform allowed us to use the natural definition of output watermark =
>> min(all input watermarks) as a way to define how timers hold output and
>> using windowed values that contained timers as a natural way to represent
>> the output time to be independent of the firing time. The purpose of the
>> PCollection right now is to store the representation of how timers are
>> encoded. I suspect that at some point in time we will have different timer
>> encodings.
>>
>
> I agree that being able to separate the hold time from firing time of a
> timer is important, but in retrospect don't think timers as PCollections is
> the only (or most natural) way to represent that (in the model or in runner
> implementations).
>
>
>> Having this fit well with how timers are delivered between the SDK and
>> Runner was an added bonus. Also, a good portion of the code that I needed
>> to fix up was more related to the assumption that there was ever only a
>> single input producer to an executable stage and plumbing of timer
>> specifications through all the runner library support layers.
>>
>> ----------
>> *There is no "clear" for timers.*
>>
>> The current Java API for timers only allows you to set them. Clearing
>> timers is not exposed to users and is only used by internal implementations
>> to support runners[2] via TimerInternals. Usage of a timer is like so:
>>   @TimerId("timer")
>>   private final TimerSpec timerSpec =
>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>>   @ProcessElement
>>   public void process(
>>       ProcessContext context,
>>       BoundedWindow window,
>>       @TimerId("timer") Timer myTimer) {
>>
>>     myTimer.set(window.maxTimestamp().plus(allowedLateness));
>>   }
>>
>
> We'll probably want clear. But currently there's already exactly one timer
> per window per key, and setting another one overwrites the previous one,
> again making it more like state. Maybe, as you said, it could involve
> retractions (but every output being a retraction seems odd.)
>

This is not true. We support multiple (tagged) timers per key.


>
>
>>
>> ---------
>> I'm not a big fan of having timers as a separate field in the elements
>> proto. I still think they should be treated as an input/output and we could
>> update the representation so that inputs/outputs for PTransforms don't need
>> to be "PCollections". I was thinking that our current PCollection
>> representation assumes that we'll never want to change it to add extra
>> information or do backwards incompatible changes like beam:pcollection:v2.
>>
>
> If the data never travels from one PTransform to another, but always go
> directly to/from the runner harness, I think using explicit channels to
> communicate this information in the fn api makes more sense than
> complicating the graph with special types of PCollections. This is
> consistent with how we do side inputs and state, and I think more
> consistent with the DAG a user has in their head when writing a pipline.
> (And I could also see speculatively pushing state or side input information
> in the data channel too.)
>
> Especially when writing, they feel a lot more like they belong to state.
> And it could make sense to try to read unfired timers as well.
>
>
>>
>> ---------
>> Other points:
>> * side inputs already require a runner to introspect the ParDo payload to
>> get the SideInputSpec, requiring it to have knowledge of the TimerSpec is
>> no different.
>>
>
> My point was that once it has knowelge of the TimerSpec, there is no need
> for (meaning no additional information provided by) the timer PCollection
> nor its edges.
>
>
>> * multimap side input over timers where the key is the key that the timer
>> is associated with. iterable side input over timers would allow you to
>> iterate over <key, timer> pairs. This could be useful for skew control in
>> sources since they would want to know how far they are ahead vs other
>> restrictions.
>> * user state as a PCollection can make sense but I can't see how we can
>> get past problems when we treat it as an "input" since the input watermark
>> would be ignored or infinity?. I do agree that this could open the door to
>> sharing "state" such as multi-key transactions but very speculative as you
>> say.
>>
>>
>> 1: https://issues.apache.org/jira/browse/BEAM-2535
>> 2:
>> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22
>>
>> On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise <[email protected]> wrote:
>>
>>> Robert,
>>>
>>> Thanks for presenting these thoughts. Your attempt to implement the
>>> timer support in the Python runner is the first strong signal we have and
>>> it is the right time to make changes - AFAIK no other runner work has been
>>> done.
>>>
>>> I'm also a bit concerned about the acrobatics required in the PR to make
>>> this work. Luke will be in the best position to comment, but as I recall we
>>> considered modeling timers as special PCollections a simplification for SDK
>>> <> Runner interaction and overall implementation. The special treatment
>>> (and slight confusion) at the graph level perhaps was an early warning
>>> sign, discovering the extra complexity wiring this in a runner should be a
>>> reason to revisit.
>>>
>>> Conceptually timers are special state, they are certainly more state
>>> than stream :) Regardless how they are passed to the harness, the runner
>>> will need to treat them similar to side inputs and user state.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>>
>>>
>>> On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> TLDR Perhaps we should revisit
>>>> https://s.apache.org/beam-portability-timers in light of the fact that
>>>> Timers are more like State than PCollections.
>>>>
>>>> --
>>>>
>>>> While looking at implementing State and Timers in the Python SDK, I've
>>>> been revisiting the ideas presented at
>>>> https://s.apache.org/beam-portability-timers , and am now starting to
>>>> wonder if this is actually the best way to model things (at least at the
>>>> Runner level). Instead it seems Timers are more resemble, and are tightly
>>>> bound to, State than PCollections.
>>>>
>>>> This is especially clear when writing timers. These timers are not a
>>>> bag of emitted elements, rather one sets (and clears) timers and the set of
>>>> timers that end up firing are a result of this *ordered* sequence of
>>>> operations. It is also often important that the setting of timers be
>>>> ordered with respect to the setting and clearing of state itself (and is
>>>> more often than not collocated with such requests).
>>>>
>>>> In addition, these self-loops add complexity to the graph but provide
>>>> no additional information--they are entirely redundant with the timerspecs
>>>> already present on DoFns. Generally I prefer less redundancy in the spec,
>>>> rather than have it be over-constrained. It's unclear what a runner that
>>>> didn't introspect the DoFn's TimerSpecs would do with this these special
>>>> edges, and also unclear how they would differ from possible self-loops due
>>>> to more traditional iteration.
>>>>
>>>> The primary motivation to express timers in this way seems to be the
>>>> desire to push them to workers using the data plan, rather than inventing
>>>> another mechanism or making them pull-based like with state. I think this
>>>> could be done by simply adding a Timer field to the Elements (or Data)
>>>> proto. (Note that this is not the same as having an hacky ElementOrTimer
>>>> elements flow through the graph.) Writes would be state requests, and
>>>> perhaps it would even make sense to "read" the current value of an unfired
>>>> timer over the state API, to be able to set things like
>>>> {min,max}(new_timestamp,old_timestamp}.
>>>>
>>>> (We could alternatively attempt to model State(s) as a PCollection(s),
>>>> but this is more speculative and would likely exacerbate some of the issues
>>>> above (though it could open the door for DoFns that somehow *share* state).
>>>> They seem like different objects though, one is a mutable store, the other
>>>> an immutable stream.)
>>>>
>>>> I realize this is a big shift, but we could probably adapt the existing
>>>> Python/Java implementations fairly easily (and it would probably simplify
>>>> them). And it's easier to do simplifications like this sooner rather than
>>>> later.
>>>>
>>>> What do people think about this? Any obvious (or not-so-obvious)
>>>> downsides that I'm missing?
>>>>
>>>> - Robert
>>>>
>>>>

Reply via email to