But there's no ordering inside a window. A stateful DoFn can see the input
elements inside of a window in any order at all. This is another reason
it's best to think of time spatially - as another data dimension - rather
than like normal processing time.

On Wed, Jan 8, 2020 at 2:26 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Luke and Kenn,
>
> I agree, my mental model fits this as well. But still, even in the
> presence of simultaneuos existence of all windows at once - GBK and
> stateful DoFns differ in the way they handle time *inside* each window (and
> I'm as well leaving merging windows outside, partly because they are not
> currently supported in stateful DoFns). GBK discretizes time (visible to
> user) through triggers, while stateful DoFn doesn't. That is where
> differences of these two come from.
>
> Jan
> On 1/7/20 10:16 PM, Luke Cwik wrote:
>
> That is a really good way to describe my mental model as well.
>
> On Tue, Jan 7, 2020 at 12:20 PM Kenneth Knowles <k...@apache.org> wrote:
>
>>
>>
>> On Tue, Jan 7, 2020 at 1:39 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Kenn,
>>>
>>> I see that my terminology seems not to be 100% aligned with Beam's. I'll
>>> work on that. :-)
>>>
>>> I agree with what you say, and by "late" I mostly meant "droppable"
>>> (arriving too late after watermark).
>>>
>>> I'm definitely not proposing to get back to something like "out of
>>> order" == "late" or anything like that. I'm also aware that stateful
>>> operation is windowed operation, but the semantics of the windowing is
>>> different than of a GBK. The difference is how time moves in GBK and how
>>> moves in stateful DoFn. Throwing away some details (early triggers, late
>>> data triggers), the main difference is that in GBK case, time hops just
>>> between window boundaries, while in stateful DoFn time moves "smoothly"
>>> (with each watermark update). Now, this difference brings the question
>>> about why the definition of "droppable" data is the same for both types of
>>> operations, when there is a difference in how users "perceive" time. As the
>>> more generic operation, stateful DoFn might deserve a more general
>>> definition of droppable data, which should degrade naturally to the one of
>>> GBK in presence of "discrete time hops".
>>>
>>
>> I understand what you mean. On the other hand, I encourage thinking of
>> event time spatially, not as time passing. That is a big part of unifying
>> batch/streaming real-time/archival processing. The event time window is a
>> secondary key to partition the data (merging windows are slightly more
>> complex). All event time windows exist simultaneously. So for both stateful
>> ParDo and GBK, I find it helpful to consider this perspective where all
>> windows are processed simultaneously / in an arbitrary order not assuming
>> windows are ordered at all. Then you see that GBK and stateful ParDo do not
>> really treat windows / watermark differently: both of them process a stream
>> of data for each (key, window) pair until the watermark informs them that
>> the stream is expired, then they GC the state associated with that (key,
>> window) pair.
>>
>> Kenn
>>
>>> This might have some consequences on how the droppable data should be
>>> handled in presence of (early) triggers, because triggerring is actually
>>> what makes time to "hop", so we might arrive to a conclusion that we might
>>> actually drop any data that has timestamp less than "last trigger time +
>>> allowed lateness". This looks appealing to me, because IMO it has strong
>>> internal logical consistency. Although it is possible that it would drop
>>> more data, which is generally undesirable, I admit that.
>>>
>>> I'm looking for explanation why the current approach was chosen instead
>>> of the other.
>>>
>>> Jan
>>> On 1/7/20 12:52 AM, Kenneth Knowles wrote:
>>>
>>> This thread has a lot in it, so I am just top-posting.
>>>
>>>  - Stateful DoFn is a windowed operation; state is per-window. When the
>>> window expires, any further inputs are dropped.
>>>  - "Late" is not synonymous with out-of-order. It doesn't really have an
>>> independent meaning.
>>>     - For a GBK/Combine "late" means "not included prior to the on-time
>>> output", and "droppable" means "arriving after window expiry".
>>>     - For Stateful DoFn there is no real meaning to "late" except if one
>>> is talking about "droppable", which still means "arriving after window
>>> expiry". A user may have a special timer where they flip a flag and treat
>>> elements after the timer differently.
>>>
>>> I think the definition of when data is droppable is very simple. We
>>> explicitly moved to this definition, away from the "out of order == late",
>>> because it is more robust and simpler to think about. Users saw lots of
>>> confusing behavior when we had "out of order by allowed lateness ==
>>> droppable" logic.
>>>
>>> Kenn
>>>
>>> On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> > Generally the watermark update can overtake elements, because
>>>> runners  can explicitly ignore late data in the watermark calculation (for
>>>> good reason - those elements are already late, so no need to hold up the
>>>> watermark advancing any more).
>>>> This seems not to affect the decision of _not late_ vs. _late_, is it?
>>>> If element is late and gets ignored from watermark calculation (whatever
>>>> that includes in this context), than the watermark cannot move past
>>>> elements that were not marked as _not late_ and thus nothing can make them
>>>> _late_.
>>>>
>>>> > For GBK on-time data simply means the first pane marked as on time.
>>>> For state+timers I don't think it makes sense for Beam to define on-time
>>>> v.s. late, rather I think the user can come up with their own definition
>>>> depending on their use case. For example, if you are buffering data into
>>>> BagState and setting a timer to process it, it would be logical to say that
>>>> any element that was buffered before the timer expired is on time, and any
>>>> data that showed up after the timer fired is late. This would roughly
>>>> correspond to what GBK does, and the answer would be very similar to simply
>>>> comparing against the watermark (as the timers fire when the watermark
>>>> advances).
>>>>
>>>> Yes, I'd say that stateful DoFns don't have (well defined) concept of
>>>> pane, because that is related to concept of trigger and this is a concept
>>>> of GBK (or windowed operations in general). The only semantic meaning of
>>>> window in stateful DoFn is that it "scopes" state.
>>>>
>>>> This discussion might have got a little off the original question, so
>>>> I'll try to rephrase it:
>>>>
>>>> Should stateful DoFn drop *all* late data, not just data that arrive
>>>> after window boundary + allowed lateness? Some arguments why I think it
>>>> should:
>>>>  * in windowed operations (GBK), it is correct to drop data on window
>>>> boundaries only, because time (as seen by user) effectively hops only on
>>>> these discrete time points
>>>>  * in stateful dofn on the other hand time move "smoothly" (yes, with
>>>> some granularity, millisecond, nanosecond, whatever and with watermark
>>>> updates only, but still)
>>>>  * this could be viewed that dropping late data immediately as time
>>>> (again, from user perspective) moves (not on some more or less artificial
>>>> boundary having only little semantic meaning) is consistent with both the
>>>> above properties
>>>>
>>>> The negative side effect of this would be, that more data could be
>>>> dropped, but ... isn't this what defines allowed lateness? I don't want to
>>>> discuss the implications on user pipelines of such a change (and if we can
>>>> or cannot do it), just trying to build some theoretical understanding of
>>>> the problem as a whole. The decision if any change could / should be made
>>>> can be done afterwards.
>>>>
>>>> Thanks,
>>>>  Jan
>>>>
>>>> On 1/4/20 10:35 PM, Reuven Lax wrote:
>>>>
>>>>
>>>>
>>>> On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> > Yes, but invariants should hold. If I add a ParDo that drops late
>>>>> elements (or, more commonly,diverts the late elements  to a different
>>>>> PCollection), then the result of that ParDo should _never_ introduce and
>>>>> more late data. This cannot be guaranteed simply with watermark checks. 
>>>>> The
>>>>> ParDo may decide that the element was not late, but by the time it outputs
>>>>> the element the watermark may have advanced, causing the element to
>>>>> actually be late.
>>>>>
>>>>> This is actually very interesting. The question is - if I decide about
>>>>> lateness based on output watermark of a PTransform, is it still the case,
>>>>> that in downstream operator(s) the element could be changed from "not 
>>>>> late"
>>>>> to "late"? Provided the output watermark is updated synchronously based on
>>>>> input data (which should be) and watermark update cannot "overtake"
>>>>> elements, I think that the downstream decision should not be changed, so
>>>>> the invariant should hold. Or am I missing something?
>>>>>
>>>>
>>>> Generally the watermark update can overtake elements, because runners
>>>> can explicitly ignore late data in the watermark calculation (for good
>>>> reason - those elements are already late, so no need to hold up the
>>>> watermark advancing any more).
>>>>
>>>> For GBK on-time data simply means the first pane marked as on time. For
>>>> state+timers I don't think it makes sense for Beam to define on-time v.s.
>>>> late, rather I think the user can come up with their own definition
>>>> depending on their use case. For example, if you are buffering data into
>>>> BagState and setting a timer to process it, it would be logical to say that
>>>> any element that was buffered before the timer expired is on time, and any
>>>> data that showed up after the timer fired is late. This would roughly
>>>> correspond to what GBK does, and the answer would be very similar to simply
>>>> comparing against the watermark (as the timers fire when the watermark
>>>> advances).
>>>>
>>>> Reuven
>>>>
>>>>> On 1/4/20 8:11 PM, Reuven Lax wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> On 1/4/20 6:14 PM, Reuven Lax wrote:
>>>>>>
>>>>>> There is a very good reason not to define lateness directly in terms
>>>>>> of the watermark. The model does not make any guarantees that the 
>>>>>> watermark
>>>>>> advances synchronously, and in fact for the Dataflow runner the watermark
>>>>>> advances asynchronously (i.e. independent of element processing). This
>>>>>> means that simply comparing an element timestamp against the watermark
>>>>>> creates a race condition. There are cases where the answer could change
>>>>>> depending on exactly when you examine the watermark, and if you examine
>>>>>> again while processing the same bundle you might come to a different
>>>>>> conclusion about lateness.
>>>>>>
>>>>>> Due to monotonicity of watermark, I don't think that the asynchronous
>>>>>> updates of watermark can change the answer from "late" to "not late". 
>>>>>> That
>>>>>> seems fine to me.
>>>>>>
>>>>>
>>>>> It's the other way around. You check to see whether an element is late
>>>>> and the answer is "not late." An instant later the answer changes to
>>>>> "late"  This does cause many problems, and is why this was changed.
>>>>>
>>>>>>
>>>>>> This non determinism is undesirable when considering lateness, as it
>>>>>> can break many invariants that users may rely on (e.g. if I could write a
>>>>>> ParDo that filtered all late data, yet still find late data showing up
>>>>>> downstream of the ParDo which would be very surprising). For that reason,
>>>>>> the SDK always marks things as late based on deterministic signals. e.g.
>>>>>> for a triggered GBK everything in the first post-watermark pane is marked
>>>>>> as on time (no matter what the watermark is) and everything in subsequent
>>>>>> panes is marked as late.
>>>>>>
>>>>>> Dropping latecomers will always be non-deterministic, that is
>>>>>> certain. This is true even in case where watermark is updated 
>>>>>> synchronously
>>>>>> with element processing, due to shuffling and varying (random) 
>>>>>> differences
>>>>>> of processing and event time in upstream operator(s). The question was 
>>>>>> only
>>>>>> if a latecomer should be dropped only at a window boundaries only (which 
>>>>>> is
>>>>>> a sort of artificial time boundary), or right away when spotted (in
>>>>>> stateful dofns only). Another question would be if latecomers should be
>>>>>> dropped based on input or output watermark, dropping based on output
>>>>>> watermark seems even to be stable in the sense, that all downstream
>>>>>> operators should come to the same conclusion (this is a bit of a
>>>>>> speculation).
>>>>>>
>>>>>
>>>>> Yes, but invariants should hold. If I add a ParDo that drops late
>>>>> elements (or, more commonly,diverts the late elements  to a different
>>>>> PCollection), then the result of that ParDo should _never_ introduce and
>>>>> more late data. This cannot be guaranteed simply with watermark checks. 
>>>>> The
>>>>> ParDo may decide that the element was not late, but by the time it outputs
>>>>> the element the watermark may have advanced, causing the element to
>>>>> actually be late.
>>>>>
>>>>> In practice this is important. And early version of Dataflow (pre
>>>>> Beam) implemented lateness by comparing against the watermark, and it
>>>>> caused no end of trouble for users.
>>>>>
>>>>>>
>>>>>> FYI - this is also the reason why Beam does not currently provide
>>>>>> users direct access to the watermark. The asynchronous nature of it  can 
>>>>>> be
>>>>>> very confusing, and often results in users writing bugs in their 
>>>>>> pipelines.
>>>>>> We decided instead to expose easier-to-reason-about signals such as 
>>>>>> timers
>>>>>> (triggered by the watermark), windows, and lateness.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>
>>>>>>> I realized the problem. I misinterpreted the
>>>>>>> LateDataDroppingDoFnRunner. It doesn't drop *all* late (arriving after
>>>>>>> watermark - allowed lateness) data, but only data, that arrive after
>>>>>>> maxTimestamp + allowedLateness of their respective windows.
>>>>>>>
>>>>>>> Stateful DoFn can run on global window (which was the case of my
>>>>>>> tests) and there is no dropping then.
>>>>>>>
>>>>>>> Two questions arise then:
>>>>>>>
>>>>>>>  a) does it mean that this is one more argument to move this logic
>>>>>>> to StatefulDoFnRunner? StatefulDoFnRunner performs state cleanup on 
>>>>>>> window
>>>>>>> GC time, so without LateDataDroppingDoFnRunner and late data will see 
>>>>>>> empty
>>>>>>> state and will produce wrong results.
>>>>>>>
>>>>>>>  b) is this behavior generally intentional and correct? Windows and
>>>>>>> triggers are (in my point of view) features of GBK, not stateful DoFn.
>>>>>>> Stateful DoFn is a low level primitive, which can be viewed to operate 
>>>>>>> on
>>>>>>> "instant" windows, which should then probably be defined as dropping 
>>>>>>> every
>>>>>>> single element arrive after allowed lateness. This might probably 
>>>>>>> relate to
>>>>>>> question if operations should be built bottom up from most primitive and
>>>>>>> generic ones to more specific ones - that is GBK be implemented on top 
>>>>>>> of
>>>>>>> stateful DoFn and not vice versa.
>>>>>>>
>>>>>>> Thoughts?
>>>>>>>
>>>>>>> Jan
>>>>>>> On 1/4/20 1:03 AM, Steve Niemitz wrote:
>>>>>>>
>>>>>>> I do agree that the direct runner doesn't drop late data arriving at
>>>>>>> a stateful DoFn (I just tested as well).
>>>>>>>
>>>>>>> However, I believe this is consistent with other runners.  I'm
>>>>>>> fairly certain (at least last time I checked) that at least Dataflow 
>>>>>>> will
>>>>>>> also only drop late data at GBK operations, and NOT stateful DoFns.
>>>>>>> Whether or not this is intentional is debatable however, without being 
>>>>>>> able
>>>>>>> to inspect the watermark inside the stateful DoFn, it'd be very 
>>>>>>> difficult
>>>>>>> to do anything useful with late data.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>>
>>>>>>>> I did write a test that tested if data is dropped in a plain
>>>>>>>> stateful DoFn. I did this as part of validating that PR [1] didn't drop
>>>>>>>> more data when using @RequiresTimeSortedInput than it would without 
>>>>>>>> this
>>>>>>>> annotation. This test failed and I didn't commit it, yet.
>>>>>>>>
>>>>>>>> The test was basically as follows:
>>>>>>>>
>>>>>>>>  - use TestStream to generate three elements with timestamps 2, 1
>>>>>>>> and 0
>>>>>>>>
>>>>>>>>  - between elements with timestamp 1 and 0 move watermark to 1
>>>>>>>>
>>>>>>>>  - use allowed lateness of zero
>>>>>>>>
>>>>>>>>  - use stateful dofn that just emits arbitrary data for each input
>>>>>>>> element
>>>>>>>>
>>>>>>>>  - use Count.globally to count outputs
>>>>>>>>
>>>>>>>> The outcome was that stateful dofn using @RequiresTimeSortedInput
>>>>>>>> output 2 elements, without the annotation it was 3 elements. I think 
>>>>>>>> the
>>>>>>>> correct one would be 2 elements in this case. The difference is caused 
>>>>>>>> by
>>>>>>>> the annotation having (currently) its own logic for dropping data, 
>>>>>>>> which
>>>>>>>> could be removed if we agree, that the data should be dropped in all 
>>>>>>>> cases.
>>>>>>>> On 1/3/20 11:23 PM, Kenneth Knowles wrote:
>>>>>>>>
>>>>>>>> Did you write such a @Category(ValidatesRunner.class) test? I
>>>>>>>> believe the Java  direct runner does drop late data, for both GBK and
>>>>>>>> stateful ParDo.
>>>>>>>>
>>>>>>>> Stateful ParDo is implemented on top of GBK:
>>>>>>>> https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172
>>>>>>>>
>>>>>>>> And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow,
>>>>>>>> does drop late data:
>>>>>>>> https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220
>>>>>>>>
>>>>>>>> I'm not sure why it has its own code, since ReduceFnRunner also
>>>>>>>> drops late data, and it does use ReduceFnRunner (the same code path all
>>>>>>>> Java-based runners use).
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <je...@seznam.cz>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes, the non-reliability of late data dropping in distributed
>>>>>>>>> runner is understood. But this is even where DirectRunner can play its
>>>>>>>>> role, because only there it is actually possible to emulate and test
>>>>>>>>> specific watermark conditions. Question regarding this for the java
>>>>>>>>> DirectRunner - should we completely drop LataDataDroppingDoFnRunner 
>>>>>>>>> and
>>>>>>>>> delegate the late data dropping to StatefulDoFnRunner? Seems logical 
>>>>>>>>> to me,
>>>>>>>>> as if we agree that late data should always be dropped, then there 
>>>>>>>>> would no
>>>>>>>>> "valid" use of StatefulDoFnRunner without the late data dropping
>>>>>>>>> functionality.
>>>>>>>>> On 1/3/20 9:32 PM, Robert Bradshaw wrote:
>>>>>>>>>
>>>>>>>>> I agree, in fact we just recently enabled late data dropping to
>>>>>>>>> the direct runner in Python to be able to develop better tests for
>>>>>>>>> Dataflow.
>>>>>>>>>
>>>>>>>>> It should be noted, however, that in a distributed runner (absent
>>>>>>>>> the quiessence of TestStream) that one can't *count* on late data 
>>>>>>>>> being
>>>>>>>>> dropped at a certain point, and in fact (due to delays in fully 
>>>>>>>>> propagating
>>>>>>>>> the watermark) late data can even become on-time, so the promises 
>>>>>>>>> about
>>>>>>>>> what happens behind the watermark are necessarily a bit loose.
>>>>>>>>>
>>>>>>>>> On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> I agree that the DirectRunner should drop late data. Late data
>>>>>>>>>> dropping is optional but the DirectRunner is used by many for 
>>>>>>>>>> testing and
>>>>>>>>>> we should have the same behaviour they would get on other runners or 
>>>>>>>>>> users
>>>>>>>>>> may be surprised.
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I just found out that DirectRunner is apparently not using
>>>>>>>>>>> LateDataDroppingDoFnRunner, which means that it doesn't drop
>>>>>>>>>>> late data
>>>>>>>>>>> in cases where there is no GBK operation involved (dropping in
>>>>>>>>>>> GBK seems
>>>>>>>>>>> to be correct). There is apparently no
>>>>>>>>>>> @Category(ValidatesRunner) test
>>>>>>>>>>> for that behavior (because DirectRunner would fail it), so the
>>>>>>>>>>> question
>>>>>>>>>>> is - should late data dropping be considered part of model (of
>>>>>>>>>>> which
>>>>>>>>>>> DirectRunner should be a canonical implementation) and therefore
>>>>>>>>>>> that
>>>>>>>>>>> should be fixed there, or is the late data dropping an optional
>>>>>>>>>>> feature
>>>>>>>>>>> of a runner?
>>>>>>>>>>>
>>>>>>>>>>> I'm strongly in favor of the first option, and I think it is
>>>>>>>>>>> likely that
>>>>>>>>>>> all real-world runners would probably adhere to that (I didn't
>>>>>>>>>>> check
>>>>>>>>>>> that, though).
>>>>>>>>>>>
>>>>>>>>>>> Opinions?
>>>>>>>>>>>
>>>>>>>>>>>   Jan
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to