That is a really good way to describe my mental model as well.

On Tue, Jan 7, 2020 at 12:20 PM Kenneth Knowles <[email protected]> wrote:

>
>
> On Tue, Jan 7, 2020 at 1:39 AM Jan Lukavský <[email protected]> wrote:
>
>> Hi Kenn,
>>
>> I see that my terminology seems not to be 100% aligned with Beam's. I'll
>> work on that. :-)
>>
>> I agree with what you say, and by "late" I mostly meant "droppable"
>> (arriving too late after watermark).
>>
>> I'm definitely not proposing to get back to something like "out of order"
>> == "late" or anything like that. I'm also aware that stateful operation is
>> windowed operation, but the semantics of the windowing is different than of
>> a GBK. The difference is how time moves in GBK and how moves in stateful
>> DoFn. Throwing away some details (early triggers, late data triggers), the
>> main difference is that in GBK case, time hops just between window
>> boundaries, while in stateful DoFn time moves "smoothly" (with each
>> watermark update). Now, this difference brings the question about why the
>> definition of "droppable" data is the same for both types of operations,
>> when there is a difference in how users "perceive" time. As the more
>> generic operation, stateful DoFn might deserve a more general definition of
>> droppable data, which should degrade naturally to the one of GBK in
>> presence of "discrete time hops".
>>
>
> I understand what you mean. On the other hand, I encourage thinking of
> event time spatially, not as time passing. That is a big part of unifying
> batch/streaming real-time/archival processing. The event time window is a
> secondary key to partition the data (merging windows are slightly more
> complex). All event time windows exist simultaneously. So for both stateful
> ParDo and GBK, I find it helpful to consider this perspective where all
> windows are processed simultaneously / in an arbitrary order not assuming
> windows are ordered at all. Then you see that GBK and stateful ParDo do not
> really treat windows / watermark differently: both of them process a stream
> of data for each (key, window) pair until the watermark informs them that
> the stream is expired, then they GC the state associated with that (key,
> window) pair.
>
> Kenn
>
>> This might have some consequences on how the droppable data should be
>> handled in presence of (early) triggers, because triggerring is actually
>> what makes time to "hop", so we might arrive to a conclusion that we might
>> actually drop any data that has timestamp less than "last trigger time +
>> allowed lateness". This looks appealing to me, because IMO it has strong
>> internal logical consistency. Although it is possible that it would drop
>> more data, which is generally undesirable, I admit that.
>>
>> I'm looking for explanation why the current approach was chosen instead
>> of the other.
>>
>> Jan
>> On 1/7/20 12:52 AM, Kenneth Knowles wrote:
>>
>> This thread has a lot in it, so I am just top-posting.
>>
>>  - Stateful DoFn is a windowed operation; state is per-window. When the
>> window expires, any further inputs are dropped.
>>  - "Late" is not synonymous with out-of-order. It doesn't really have an
>> independent meaning.
>>     - For a GBK/Combine "late" means "not included prior to the on-time
>> output", and "droppable" means "arriving after window expiry".
>>     - For Stateful DoFn there is no real meaning to "late" except if one
>> is talking about "droppable", which still means "arriving after window
>> expiry". A user may have a special timer where they flip a flag and treat
>> elements after the timer differently.
>>
>> I think the definition of when data is droppable is very simple. We
>> explicitly moved to this definition, away from the "out of order == late",
>> because it is more robust and simpler to think about. Users saw lots of
>> confusing behavior when we had "out of order by allowed lateness ==
>> droppable" logic.
>>
>> Kenn
>>
>> On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <[email protected]> wrote:
>>
>>> > Generally the watermark update can overtake elements, because runners
>>> can explicitly ignore late data in the watermark calculation (for good
>>> reason - those elements are already late, so no need to hold up the
>>> watermark advancing any more).
>>> This seems not to affect the decision of _not late_ vs. _late_, is it?
>>> If element is late and gets ignored from watermark calculation (whatever
>>> that includes in this context), than the watermark cannot move past
>>> elements that were not marked as _not late_ and thus nothing can make them
>>> _late_.
>>>
>>> > For GBK on-time data simply means the first pane marked as on time.
>>> For state+timers I don't think it makes sense for Beam to define on-time
>>> v.s. late, rather I think the user can come up with their own definition
>>> depending on their use case. For example, if you are buffering data into
>>> BagState and setting a timer to process it, it would be logical to say that
>>> any element that was buffered before the timer expired is on time, and any
>>> data that showed up after the timer fired is late. This would roughly
>>> correspond to what GBK does, and the answer would be very similar to simply
>>> comparing against the watermark (as the timers fire when the watermark
>>> advances).
>>>
>>> Yes, I'd say that stateful DoFns don't have (well defined) concept of
>>> pane, because that is related to concept of trigger and this is a concept
>>> of GBK (or windowed operations in general). The only semantic meaning of
>>> window in stateful DoFn is that it "scopes" state.
>>>
>>> This discussion might have got a little off the original question, so
>>> I'll try to rephrase it:
>>>
>>> Should stateful DoFn drop *all* late data, not just data that arrive
>>> after window boundary + allowed lateness? Some arguments why I think it
>>> should:
>>>  * in windowed operations (GBK), it is correct to drop data on window
>>> boundaries only, because time (as seen by user) effectively hops only on
>>> these discrete time points
>>>  * in stateful dofn on the other hand time move "smoothly" (yes, with
>>> some granularity, millisecond, nanosecond, whatever and with watermark
>>> updates only, but still)
>>>  * this could be viewed that dropping late data immediately as time
>>> (again, from user perspective) moves (not on some more or less artificial
>>> boundary having only little semantic meaning) is consistent with both the
>>> above properties
>>>
>>> The negative side effect of this would be, that more data could be
>>> dropped, but ... isn't this what defines allowed lateness? I don't want to
>>> discuss the implications on user pipelines of such a change (and if we can
>>> or cannot do it), just trying to build some theoretical understanding of
>>> the problem as a whole. The decision if any change could / should be made
>>> can be done afterwards.
>>>
>>> Thanks,
>>>  Jan
>>>
>>> On 1/4/20 10:35 PM, Reuven Lax wrote:
>>>
>>>
>>>
>>> On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský <[email protected]> wrote:
>>>
>>>> > Yes, but invariants should hold. If I add a ParDo that drops late
>>>> elements (or, more commonly,diverts the late elements  to a different
>>>> PCollection), then the result of that ParDo should _never_ introduce and
>>>> more late data. This cannot be guaranteed simply with watermark checks. The
>>>> ParDo may decide that the element was not late, but by the time it outputs
>>>> the element the watermark may have advanced, causing the element to
>>>> actually be late.
>>>>
>>>> This is actually very interesting. The question is - if I decide about
>>>> lateness based on output watermark of a PTransform, is it still the case,
>>>> that in downstream operator(s) the element could be changed from "not late"
>>>> to "late"? Provided the output watermark is updated synchronously based on
>>>> input data (which should be) and watermark update cannot "overtake"
>>>> elements, I think that the downstream decision should not be changed, so
>>>> the invariant should hold. Or am I missing something?
>>>>
>>>
>>> Generally the watermark update can overtake elements, because runners
>>> can explicitly ignore late data in the watermark calculation (for good
>>> reason - those elements are already late, so no need to hold up the
>>> watermark advancing any more).
>>>
>>> For GBK on-time data simply means the first pane marked as on time. For
>>> state+timers I don't think it makes sense for Beam to define on-time v.s.
>>> late, rather I think the user can come up with their own definition
>>> depending on their use case. For example, if you are buffering data into
>>> BagState and setting a timer to process it, it would be logical to say that
>>> any element that was buffered before the timer expired is on time, and any
>>> data that showed up after the timer fired is late. This would roughly
>>> correspond to what GBK does, and the answer would be very similar to simply
>>> comparing against the watermark (as the timers fire when the watermark
>>> advances).
>>>
>>> Reuven
>>>
>>>> On 1/4/20 8:11 PM, Reuven Lax wrote:
>>>>
>>>>
>>>>
>>>> On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>> On 1/4/20 6:14 PM, Reuven Lax wrote:
>>>>>
>>>>> There is a very good reason not to define lateness directly in terms
>>>>> of the watermark. The model does not make any guarantees that the 
>>>>> watermark
>>>>> advances synchronously, and in fact for the Dataflow runner the watermark
>>>>> advances asynchronously (i.e. independent of element processing). This
>>>>> means that simply comparing an element timestamp against the watermark
>>>>> creates a race condition. There are cases where the answer could change
>>>>> depending on exactly when you examine the watermark, and if you examine
>>>>> again while processing the same bundle you might come to a different
>>>>> conclusion about lateness.
>>>>>
>>>>> Due to monotonicity of watermark, I don't think that the asynchronous
>>>>> updates of watermark can change the answer from "late" to "not late". That
>>>>> seems fine to me.
>>>>>
>>>>
>>>> It's the other way around. You check to see whether an element is late
>>>> and the answer is "not late." An instant later the answer changes to
>>>> "late"  This does cause many problems, and is why this was changed.
>>>>
>>>>>
>>>>> This non determinism is undesirable when considering lateness, as it
>>>>> can break many invariants that users may rely on (e.g. if I could write a
>>>>> ParDo that filtered all late data, yet still find late data showing up
>>>>> downstream of the ParDo which would be very surprising). For that reason,
>>>>> the SDK always marks things as late based on deterministic signals. e.g.
>>>>> for a triggered GBK everything in the first post-watermark pane is marked
>>>>> as on time (no matter what the watermark is) and everything in subsequent
>>>>> panes is marked as late.
>>>>>
>>>>> Dropping latecomers will always be non-deterministic, that is certain.
>>>>> This is true even in case where watermark is updated synchronously with
>>>>> element processing, due to shuffling and varying (random) differences of
>>>>> processing and event time in upstream operator(s). The question was only 
>>>>> if
>>>>> a latecomer should be dropped only at a window boundaries only (which is a
>>>>> sort of artificial time boundary), or right away when spotted (in stateful
>>>>> dofns only). Another question would be if latecomers should be dropped
>>>>> based on input or output watermark, dropping based on output watermark
>>>>> seems even to be stable in the sense, that all downstream operators should
>>>>> come to the same conclusion (this is a bit of a speculation).
>>>>>
>>>>
>>>> Yes, but invariants should hold. If I add a ParDo that drops late
>>>> elements (or, more commonly,diverts the late elements  to a different
>>>> PCollection), then the result of that ParDo should _never_ introduce and
>>>> more late data. This cannot be guaranteed simply with watermark checks. The
>>>> ParDo may decide that the element was not late, but by the time it outputs
>>>> the element the watermark may have advanced, causing the element to
>>>> actually be late.
>>>>
>>>> In practice this is important. And early version of Dataflow (pre Beam)
>>>> implemented lateness by comparing against the watermark, and it caused no
>>>> end of trouble for users.
>>>>
>>>>>
>>>>> FYI - this is also the reason why Beam does not currently provide
>>>>> users direct access to the watermark. The asynchronous nature of it  can 
>>>>> be
>>>>> very confusing, and often results in users writing bugs in their 
>>>>> pipelines.
>>>>> We decided instead to expose easier-to-reason-about signals such as timers
>>>>> (triggered by the watermark), windows, and lateness.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <[email protected]> wrote:
>>>>>
>>>>>> I realized the problem. I misinterpreted the
>>>>>> LateDataDroppingDoFnRunner. It doesn't drop *all* late (arriving after
>>>>>> watermark - allowed lateness) data, but only data, that arrive after
>>>>>> maxTimestamp + allowedLateness of their respective windows.
>>>>>>
>>>>>> Stateful DoFn can run on global window (which was the case of my
>>>>>> tests) and there is no dropping then.
>>>>>>
>>>>>> Two questions arise then:
>>>>>>
>>>>>>  a) does it mean that this is one more argument to move this logic to
>>>>>> StatefulDoFnRunner? StatefulDoFnRunner performs state cleanup on window 
>>>>>> GC
>>>>>> time, so without LateDataDroppingDoFnRunner and late data will see empty
>>>>>> state and will produce wrong results.
>>>>>>
>>>>>>  b) is this behavior generally intentional and correct? Windows and
>>>>>> triggers are (in my point of view) features of GBK, not stateful DoFn.
>>>>>> Stateful DoFn is a low level primitive, which can be viewed to operate on
>>>>>> "instant" windows, which should then probably be defined as dropping 
>>>>>> every
>>>>>> single element arrive after allowed lateness. This might probably relate 
>>>>>> to
>>>>>> question if operations should be built bottom up from most primitive and
>>>>>> generic ones to more specific ones - that is GBK be implemented on top of
>>>>>> stateful DoFn and not vice versa.
>>>>>>
>>>>>> Thoughts?
>>>>>>
>>>>>> Jan
>>>>>> On 1/4/20 1:03 AM, Steve Niemitz wrote:
>>>>>>
>>>>>> I do agree that the direct runner doesn't drop late data arriving at
>>>>>> a stateful DoFn (I just tested as well).
>>>>>>
>>>>>> However, I believe this is consistent with other runners.  I'm fairly
>>>>>> certain (at least last time I checked) that at least Dataflow will also
>>>>>> only drop late data at GBK operations, and NOT stateful DoFns.  Whether 
>>>>>> or
>>>>>> not this is intentional is debatable however, without being able to 
>>>>>> inspect
>>>>>> the watermark inside the stateful DoFn, it'd be very difficult to do
>>>>>> anything useful with late data.
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <[email protected]> wrote:
>>>>>>
>>>>>>> I did write a test that tested if data is dropped in a plain
>>>>>>> stateful DoFn. I did this as part of validating that PR [1] didn't drop
>>>>>>> more data when using @RequiresTimeSortedInput than it would without this
>>>>>>> annotation. This test failed and I didn't commit it, yet.
>>>>>>>
>>>>>>> The test was basically as follows:
>>>>>>>
>>>>>>>  - use TestStream to generate three elements with timestamps 2, 1
>>>>>>> and 0
>>>>>>>
>>>>>>>  - between elements with timestamp 1 and 0 move watermark to 1
>>>>>>>
>>>>>>>  - use allowed lateness of zero
>>>>>>>
>>>>>>>  - use stateful dofn that just emits arbitrary data for each input
>>>>>>> element
>>>>>>>
>>>>>>>  - use Count.globally to count outputs
>>>>>>>
>>>>>>> The outcome was that stateful dofn using @RequiresTimeSortedInput
>>>>>>> output 2 elements, without the annotation it was 3 elements. I think the
>>>>>>> correct one would be 2 elements in this case. The difference is caused 
>>>>>>> by
>>>>>>> the annotation having (currently) its own logic for dropping data, which
>>>>>>> could be removed if we agree, that the data should be dropped in all 
>>>>>>> cases.
>>>>>>> On 1/3/20 11:23 PM, Kenneth Knowles wrote:
>>>>>>>
>>>>>>> Did you write such a @Category(ValidatesRunner.class) test? I
>>>>>>> believe the Java  direct runner does drop late data, for both GBK and
>>>>>>> stateful ParDo.
>>>>>>>
>>>>>>> Stateful ParDo is implemented on top of GBK:
>>>>>>> https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172
>>>>>>>
>>>>>>> And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow,
>>>>>>> does drop late data:
>>>>>>> https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220
>>>>>>>
>>>>>>> I'm not sure why it has its own code, since ReduceFnRunner also
>>>>>>> drops late data, and it does use ReduceFnRunner (the same code path all
>>>>>>> Java-based runners use).
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <[email protected]> wrote:
>>>>>>>
>>>>>>>> Yes, the non-reliability of late data dropping in distributed
>>>>>>>> runner is understood. But this is even where DirectRunner can play its
>>>>>>>> role, because only there it is actually possible to emulate and test
>>>>>>>> specific watermark conditions. Question regarding this for the java
>>>>>>>> DirectRunner - should we completely drop LataDataDroppingDoFnRunner and
>>>>>>>> delegate the late data dropping to StatefulDoFnRunner? Seems logical 
>>>>>>>> to me,
>>>>>>>> as if we agree that late data should always be dropped, then there 
>>>>>>>> would no
>>>>>>>> "valid" use of StatefulDoFnRunner without the late data dropping
>>>>>>>> functionality.
>>>>>>>> On 1/3/20 9:32 PM, Robert Bradshaw wrote:
>>>>>>>>
>>>>>>>> I agree, in fact we just recently enabled late data dropping to the
>>>>>>>> direct runner in Python to be able to develop better tests for 
>>>>>>>> Dataflow.
>>>>>>>>
>>>>>>>> It should be noted, however, that in a distributed runner (absent
>>>>>>>> the quiessence of TestStream) that one can't *count* on late data being
>>>>>>>> dropped at a certain point, and in fact (due to delays in fully 
>>>>>>>> propagating
>>>>>>>> the watermark) late data can even become on-time, so the promises about
>>>>>>>> what happens behind the watermark are necessarily a bit loose.
>>>>>>>>
>>>>>>>> On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> I agree that the DirectRunner should drop late data. Late data
>>>>>>>>> dropping is optional but the DirectRunner is used by many for testing 
>>>>>>>>> and
>>>>>>>>> we should have the same behaviour they would get on other runners or 
>>>>>>>>> users
>>>>>>>>> may be surprised.
>>>>>>>>>
>>>>>>>>> On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I just found out that DirectRunner is apparently not using
>>>>>>>>>> LateDataDroppingDoFnRunner, which means that it doesn't drop late
>>>>>>>>>> data
>>>>>>>>>> in cases where there is no GBK operation involved (dropping in
>>>>>>>>>> GBK seems
>>>>>>>>>> to be correct). There is apparently no @Category(ValidatesRunner)
>>>>>>>>>> test
>>>>>>>>>> for that behavior (because DirectRunner would fail it), so the
>>>>>>>>>> question
>>>>>>>>>> is - should late data dropping be considered part of model (of
>>>>>>>>>> which
>>>>>>>>>> DirectRunner should be a canonical implementation) and therefore
>>>>>>>>>> that
>>>>>>>>>> should be fixed there, or is the late data dropping an optional
>>>>>>>>>> feature
>>>>>>>>>> of a runner?
>>>>>>>>>>
>>>>>>>>>> I'm strongly in favor of the first option, and I think it is
>>>>>>>>>> likely that
>>>>>>>>>> all real-world runners would probably adhere to that (I didn't
>>>>>>>>>> check
>>>>>>>>>> that, though).
>>>>>>>>>>
>>>>>>>>>> Opinions?
>>>>>>>>>>
>>>>>>>>>>   Jan
>>>>>>>>>>
>>>>>>>>>>

Reply via email to