On Tue, Jan 7, 2020 at 1:39 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Kenn,
>
> I see that my terminology seems not to be 100% aligned with Beam's. I'll
> work on that. :-)
>
> I agree with what you say, and by "late" I mostly meant "droppable"
> (arriving too late after watermark).
>
> I'm definitely not proposing to get back to something like "out of order"
> == "late" or anything like that. I'm also aware that stateful operation is
> windowed operation, but the semantics of the windowing is different than of
> a GBK. The difference is how time moves in GBK and how moves in stateful
> DoFn. Throwing away some details (early triggers, late data triggers), the
> main difference is that in GBK case, time hops just between window
> boundaries, while in stateful DoFn time moves "smoothly" (with each
> watermark update). Now, this difference brings the question about why the
> definition of "droppable" data is the same for both types of operations,
> when there is a difference in how users "perceive" time. As the more
> generic operation, stateful DoFn might deserve a more general definition of
> droppable data, which should degrade naturally to the one of GBK in
> presence of "discrete time hops".
>

I understand what you mean. On the other hand, I encourage thinking of
event time spatially, not as time passing. That is a big part of unifying
batch/streaming real-time/archival processing. The event time window is a
secondary key to partition the data (merging windows are slightly more
complex). All event time windows exist simultaneously. So for both stateful
ParDo and GBK, I find it helpful to consider this perspective where all
windows are processed simultaneously / in an arbitrary order not assuming
windows are ordered at all. Then you see that GBK and stateful ParDo do not
really treat windows / watermark differently: both of them process a stream
of data for each (key, window) pair until the watermark informs them that
the stream is expired, then they GC the state associated with that (key,
window) pair.

Kenn

> This might have some consequences on how the droppable data should be
> handled in presence of (early) triggers, because triggerring is actually
> what makes time to "hop", so we might arrive to a conclusion that we might
> actually drop any data that has timestamp less than "last trigger time +
> allowed lateness". This looks appealing to me, because IMO it has strong
> internal logical consistency. Although it is possible that it would drop
> more data, which is generally undesirable, I admit that.
>
> I'm looking for explanation why the current approach was chosen instead of
> the other.
>
> Jan
> On 1/7/20 12:52 AM, Kenneth Knowles wrote:
>
> This thread has a lot in it, so I am just top-posting.
>
>  - Stateful DoFn is a windowed operation; state is per-window. When the
> window expires, any further inputs are dropped.
>  - "Late" is not synonymous with out-of-order. It doesn't really have an
> independent meaning.
>     - For a GBK/Combine "late" means "not included prior to the on-time
> output", and "droppable" means "arriving after window expiry".
>     - For Stateful DoFn there is no real meaning to "late" except if one
> is talking about "droppable", which still means "arriving after window
> expiry". A user may have a special timer where they flip a flag and treat
> elements after the timer differently.
>
> I think the definition of when data is droppable is very simple. We
> explicitly moved to this definition, away from the "out of order == late",
> because it is more robust and simpler to think about. Users saw lots of
> confusing behavior when we had "out of order by allowed lateness ==
> droppable" logic.
>
> Kenn
>
> On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> > Generally the watermark update can overtake elements, because runners
>> can explicitly ignore late data in the watermark calculation (for good
>> reason - those elements are already late, so no need to hold up the
>> watermark advancing any more).
>> This seems not to affect the decision of _not late_ vs. _late_, is it? If
>> element is late and gets ignored from watermark calculation (whatever that
>> includes in this context), than the watermark cannot move past elements
>> that were not marked as _not late_ and thus nothing can make them _late_.
>>
>> > For GBK on-time data simply means the first pane marked as on time. For
>> state+timers I don't think it makes sense for Beam to define on-time v.s.
>> late, rather I think the user can come up with their own definition
>> depending on their use case. For example, if you are buffering data into
>> BagState and setting a timer to process it, it would be logical to say that
>> any element that was buffered before the timer expired is on time, and any
>> data that showed up after the timer fired is late. This would roughly
>> correspond to what GBK does, and the answer would be very similar to simply
>> comparing against the watermark (as the timers fire when the watermark
>> advances).
>>
>> Yes, I'd say that stateful DoFns don't have (well defined) concept of
>> pane, because that is related to concept of trigger and this is a concept
>> of GBK (or windowed operations in general). The only semantic meaning of
>> window in stateful DoFn is that it "scopes" state.
>>
>> This discussion might have got a little off the original question, so
>> I'll try to rephrase it:
>>
>> Should stateful DoFn drop *all* late data, not just data that arrive
>> after window boundary + allowed lateness? Some arguments why I think it
>> should:
>>  * in windowed operations (GBK), it is correct to drop data on window
>> boundaries only, because time (as seen by user) effectively hops only on
>> these discrete time points
>>  * in stateful dofn on the other hand time move "smoothly" (yes, with
>> some granularity, millisecond, nanosecond, whatever and with watermark
>> updates only, but still)
>>  * this could be viewed that dropping late data immediately as time
>> (again, from user perspective) moves (not on some more or less artificial
>> boundary having only little semantic meaning) is consistent with both the
>> above properties
>>
>> The negative side effect of this would be, that more data could be
>> dropped, but ... isn't this what defines allowed lateness? I don't want to
>> discuss the implications on user pipelines of such a change (and if we can
>> or cannot do it), just trying to build some theoretical understanding of
>> the problem as a whole. The decision if any change could / should be made
>> can be done afterwards.
>>
>> Thanks,
>>  Jan
>>
>> On 1/4/20 10:35 PM, Reuven Lax wrote:
>>
>>
>>
>> On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> > Yes, but invariants should hold. If I add a ParDo that drops late
>>> elements (or, more commonly,diverts the late elements  to a different
>>> PCollection), then the result of that ParDo should _never_ introduce and
>>> more late data. This cannot be guaranteed simply with watermark checks. The
>>> ParDo may decide that the element was not late, but by the time it outputs
>>> the element the watermark may have advanced, causing the element to
>>> actually be late.
>>>
>>> This is actually very interesting. The question is - if I decide about
>>> lateness based on output watermark of a PTransform, is it still the case,
>>> that in downstream operator(s) the element could be changed from "not late"
>>> to "late"? Provided the output watermark is updated synchronously based on
>>> input data (which should be) and watermark update cannot "overtake"
>>> elements, I think that the downstream decision should not be changed, so
>>> the invariant should hold. Or am I missing something?
>>>
>>
>> Generally the watermark update can overtake elements, because runners
>> can explicitly ignore late data in the watermark calculation (for good
>> reason - those elements are already late, so no need to hold up the
>> watermark advancing any more).
>>
>> For GBK on-time data simply means the first pane marked as on time. For
>> state+timers I don't think it makes sense for Beam to define on-time v.s.
>> late, rather I think the user can come up with their own definition
>> depending on their use case. For example, if you are buffering data into
>> BagState and setting a timer to process it, it would be logical to say that
>> any element that was buffered before the timer expired is on time, and any
>> data that showed up after the timer fired is late. This would roughly
>> correspond to what GBK does, and the answer would be very similar to simply
>> comparing against the watermark (as the timers fire when the watermark
>> advances).
>>
>> Reuven
>>
>>> On 1/4/20 8:11 PM, Reuven Lax wrote:
>>>
>>>
>>>
>>> On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> On 1/4/20 6:14 PM, Reuven Lax wrote:
>>>>
>>>> There is a very good reason not to define lateness directly in terms of
>>>> the watermark. The model does not make any guarantees that the watermark
>>>> advances synchronously, and in fact for the Dataflow runner the watermark
>>>> advances asynchronously (i.e. independent of element processing). This
>>>> means that simply comparing an element timestamp against the watermark
>>>> creates a race condition. There are cases where the answer could change
>>>> depending on exactly when you examine the watermark, and if you examine
>>>> again while processing the same bundle you might come to a different
>>>> conclusion about lateness.
>>>>
>>>> Due to monotonicity of watermark, I don't think that the asynchronous
>>>> updates of watermark can change the answer from "late" to "not late". That
>>>> seems fine to me.
>>>>
>>>
>>> It's the other way around. You check to see whether an element is late
>>> and the answer is "not late." An instant later the answer changes to
>>> "late"  This does cause many problems, and is why this was changed.
>>>
>>>>
>>>> This non determinism is undesirable when considering lateness, as it
>>>> can break many invariants that users may rely on (e.g. if I could write a
>>>> ParDo that filtered all late data, yet still find late data showing up
>>>> downstream of the ParDo which would be very surprising). For that reason,
>>>> the SDK always marks things as late based on deterministic signals. e.g.
>>>> for a triggered GBK everything in the first post-watermark pane is marked
>>>> as on time (no matter what the watermark is) and everything in subsequent
>>>> panes is marked as late.
>>>>
>>>> Dropping latecomers will always be non-deterministic, that is certain.
>>>> This is true even in case where watermark is updated synchronously with
>>>> element processing, due to shuffling and varying (random) differences of
>>>> processing and event time in upstream operator(s). The question was only if
>>>> a latecomer should be dropped only at a window boundaries only (which is a
>>>> sort of artificial time boundary), or right away when spotted (in stateful
>>>> dofns only). Another question would be if latecomers should be dropped
>>>> based on input or output watermark, dropping based on output watermark
>>>> seems even to be stable in the sense, that all downstream operators should
>>>> come to the same conclusion (this is a bit of a speculation).
>>>>
>>>
>>> Yes, but invariants should hold. If I add a ParDo that drops late
>>> elements (or, more commonly,diverts the late elements  to a different
>>> PCollection), then the result of that ParDo should _never_ introduce and
>>> more late data. This cannot be guaranteed simply with watermark checks. The
>>> ParDo may decide that the element was not late, but by the time it outputs
>>> the element the watermark may have advanced, causing the element to
>>> actually be late.
>>>
>>> In practice this is important. And early version of Dataflow (pre Beam)
>>> implemented lateness by comparing against the watermark, and it caused no
>>> end of trouble for users.
>>>
>>>>
>>>> FYI - this is also the reason why Beam does not currently provide users
>>>> direct access to the watermark. The asynchronous nature of it  can be very
>>>> confusing, and often results in users writing bugs in their pipelines. We
>>>> decided instead to expose easier-to-reason-about signals such as timers
>>>> (triggered by the watermark), windows, and lateness.
>>>>
>>>> Reuven
>>>>
>>>> On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> I realized the problem. I misinterpreted the
>>>>> LateDataDroppingDoFnRunner. It doesn't drop *all* late (arriving after
>>>>> watermark - allowed lateness) data, but only data, that arrive after
>>>>> maxTimestamp + allowedLateness of their respective windows.
>>>>>
>>>>> Stateful DoFn can run on global window (which was the case of my
>>>>> tests) and there is no dropping then.
>>>>>
>>>>> Two questions arise then:
>>>>>
>>>>>  a) does it mean that this is one more argument to move this logic to
>>>>> StatefulDoFnRunner? StatefulDoFnRunner performs state cleanup on window GC
>>>>> time, so without LateDataDroppingDoFnRunner and late data will see empty
>>>>> state and will produce wrong results.
>>>>>
>>>>>  b) is this behavior generally intentional and correct? Windows and
>>>>> triggers are (in my point of view) features of GBK, not stateful DoFn.
>>>>> Stateful DoFn is a low level primitive, which can be viewed to operate on
>>>>> "instant" windows, which should then probably be defined as dropping every
>>>>> single element arrive after allowed lateness. This might probably relate 
>>>>> to
>>>>> question if operations should be built bottom up from most primitive and
>>>>> generic ones to more specific ones - that is GBK be implemented on top of
>>>>> stateful DoFn and not vice versa.
>>>>>
>>>>> Thoughts?
>>>>>
>>>>> Jan
>>>>> On 1/4/20 1:03 AM, Steve Niemitz wrote:
>>>>>
>>>>> I do agree that the direct runner doesn't drop late data arriving at a
>>>>> stateful DoFn (I just tested as well).
>>>>>
>>>>> However, I believe this is consistent with other runners.  I'm fairly
>>>>> certain (at least last time I checked) that at least Dataflow will also
>>>>> only drop late data at GBK operations, and NOT stateful DoFns.  Whether or
>>>>> not this is intentional is debatable however, without being able to 
>>>>> inspect
>>>>> the watermark inside the stateful DoFn, it'd be very difficult to do
>>>>> anything useful with late data.
>>>>>
>>>>>
>>>>> On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> I did write a test that tested if data is dropped in a plain stateful
>>>>>> DoFn. I did this as part of validating that PR [1] didn't drop more data
>>>>>> when using @RequiresTimeSortedInput than it would without this 
>>>>>> annotation.
>>>>>> This test failed and I didn't commit it, yet.
>>>>>>
>>>>>> The test was basically as follows:
>>>>>>
>>>>>>  - use TestStream to generate three elements with timestamps 2, 1 and
>>>>>> 0
>>>>>>
>>>>>>  - between elements with timestamp 1 and 0 move watermark to 1
>>>>>>
>>>>>>  - use allowed lateness of zero
>>>>>>
>>>>>>  - use stateful dofn that just emits arbitrary data for each input
>>>>>> element
>>>>>>
>>>>>>  - use Count.globally to count outputs
>>>>>>
>>>>>> The outcome was that stateful dofn using @RequiresTimeSortedInput
>>>>>> output 2 elements, without the annotation it was 3 elements. I think the
>>>>>> correct one would be 2 elements in this case. The difference is caused by
>>>>>> the annotation having (currently) its own logic for dropping data, which
>>>>>> could be removed if we agree, that the data should be dropped in all 
>>>>>> cases.
>>>>>> On 1/3/20 11:23 PM, Kenneth Knowles wrote:
>>>>>>
>>>>>> Did you write such a @Category(ValidatesRunner.class) test? I believe
>>>>>> the Java  direct runner does drop late data, for both GBK and stateful
>>>>>> ParDo.
>>>>>>
>>>>>> Stateful ParDo is implemented on top of GBK:
>>>>>> https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172
>>>>>>
>>>>>> And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow,
>>>>>> does drop late data:
>>>>>> https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220
>>>>>>
>>>>>> I'm not sure why it has its own code, since ReduceFnRunner also drops
>>>>>> late data, and it does use ReduceFnRunner (the same code path all
>>>>>> Java-based runners use).
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>
>>>>>>> Yes, the non-reliability of late data dropping in distributed runner
>>>>>>> is understood. But this is even where DirectRunner can play its role,
>>>>>>> because only there it is actually possible to emulate and test specific
>>>>>>> watermark conditions. Question regarding this for the java DirectRunner 
>>>>>>> -
>>>>>>> should we completely drop LataDataDroppingDoFnRunner and delegate the 
>>>>>>> late
>>>>>>> data dropping to StatefulDoFnRunner? Seems logical to me, as if we agree
>>>>>>> that late data should always be dropped, then there would no "valid" 
>>>>>>> use of
>>>>>>> StatefulDoFnRunner without the late data dropping functionality.
>>>>>>> On 1/3/20 9:32 PM, Robert Bradshaw wrote:
>>>>>>>
>>>>>>> I agree, in fact we just recently enabled late data dropping to the
>>>>>>> direct runner in Python to be able to develop better tests for Dataflow.
>>>>>>>
>>>>>>> It should be noted, however, that in a distributed runner (absent
>>>>>>> the quiessence of TestStream) that one can't *count* on late data being
>>>>>>> dropped at a certain point, and in fact (due to delays in fully 
>>>>>>> propagating
>>>>>>> the watermark) late data can even become on-time, so the promises about
>>>>>>> what happens behind the watermark are necessarily a bit loose.
>>>>>>>
>>>>>>> On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> I agree that the DirectRunner should drop late data. Late data
>>>>>>>> dropping is optional but the DirectRunner is used by many for testing 
>>>>>>>> and
>>>>>>>> we should have the same behaviour they would get on other runners or 
>>>>>>>> users
>>>>>>>> may be surprised.
>>>>>>>>
>>>>>>>> On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I just found out that DirectRunner is apparently not using
>>>>>>>>> LateDataDroppingDoFnRunner, which means that it doesn't drop late
>>>>>>>>> data
>>>>>>>>> in cases where there is no GBK operation involved (dropping in GBK
>>>>>>>>> seems
>>>>>>>>> to be correct). There is apparently no @Category(ValidatesRunner)
>>>>>>>>> test
>>>>>>>>> for that behavior (because DirectRunner would fail it), so the
>>>>>>>>> question
>>>>>>>>> is - should late data dropping be considered part of model (of
>>>>>>>>> which
>>>>>>>>> DirectRunner should be a canonical implementation) and therefore
>>>>>>>>> that
>>>>>>>>> should be fixed there, or is the late data dropping an optional
>>>>>>>>> feature
>>>>>>>>> of a runner?
>>>>>>>>>
>>>>>>>>> I'm strongly in favor of the first option, and I think it is
>>>>>>>>> likely that
>>>>>>>>> all real-world runners would probably adhere to that (I didn't
>>>>>>>>> check
>>>>>>>>> that, though).
>>>>>>>>>
>>>>>>>>> Opinions?
>>>>>>>>>
>>>>>>>>>   Jan
>>>>>>>>>
>>>>>>>>>

Reply via email to