That is a really good way to describe my mental model as well.
On Tue, Jan 7, 2020 at 12:20 PM Kenneth Knowles <[email protected]
<mailto:[email protected]>> wrote:
On Tue, Jan 7, 2020 at 1:39 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Kenn,
I see that my terminology seems not to be 100% aligned
with Beam's. I'll work on that. :-)
I agree with what you say, and by "late" I mostly meant
"droppable" (arriving too late after watermark).
I'm definitely not proposing to get back to something
like "out of order" == "late" or anything like that. I'm
also aware that stateful operation is windowed operation,
but the semantics of the windowing is different than of a
GBK. The difference is how time moves in GBK and how
moves in stateful DoFn. Throwing away some details (early
triggers, late data triggers), the main difference is
that in GBK case, time hops just between window
boundaries, while in stateful DoFn time moves "smoothly"
(with each watermark update). Now, this difference brings
the question about why the definition of "droppable" data
is the same for both types of operations, when there is a
difference in how users "perceive" time. As the more
generic operation, stateful DoFn might deserve a more
general definition of droppable data, which should
degrade naturally to the one of GBK in presence of
"discrete time hops".
I understand what you mean. On the other hand, I encourage
thinking of event time spatially, not as time passing. That
is a big part of unifying batch/streaming real-time/archival
processing. The event time window is a secondary key to
partition the data (merging windows are slightly more
complex). All event time windows exist simultaneously. So for
both stateful ParDo and GBK, I find it helpful to consider
this perspective where all windows are processed
simultaneously / in an arbitrary order not assuming windows
are ordered at all. Then you see that GBK and stateful ParDo
do not really treat windows / watermark differently: both of
them process a stream of data for each (key, window) pair
until the watermark informs them that the stream is expired,
then they GC the state associated with that (key, window) pair.
Kenn
This might have some consequences on how the droppable
data should be handled in presence of (early) triggers,
because triggerring is actually what makes time to "hop",
so we might arrive to a conclusion that we might actually
drop any data that has timestamp less than "last trigger
time + allowed lateness". This looks appealing to me,
because IMO it has strong internal logical consistency.
Although it is possible that it would drop more data,
which is generally undesirable, I admit that.
I'm looking for explanation why the current approach was
chosen instead of the other.
Jan
On 1/7/20 12:52 AM, Kenneth Knowles wrote:
This thread has a lot in it, so I am just top-posting.
- Stateful DoFn is a windowed operation; state is
per-window. When the window expires, any further inputs
are dropped.
- "Late" is not synonymous with out-of-order. It
doesn't really have an independent meaning.
- For a GBK/Combine "late" means "not included prior
to the on-time output", and "droppable" means "arriving
after window expiry".
- For Stateful DoFn there is no real meaning to
"late" except if one is talking about "droppable", which
still means "arriving after window expiry". A user may
have a special timer where they flip a flag and treat
elements after the timer differently.
I think the definition of when data is droppable is very
simple. We explicitly moved to this definition, away
from the "out of order == late", because it is more
robust and simpler to think about. Users saw lots of
confusing behavior when we had "out of order by allowed
lateness == droppable" logic.
Kenn
On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
> Generally the watermark update can overtake
elements, because runners can explicitly ignore
late data in the watermark calculation (for good
reason - those elements are already late, so no need
to hold up the watermark advancing any more).
This seems not to affect the decision of _not late_
vs. _late_, is it? If element is late and gets
ignored from watermark calculation (whatever that
includes in this context), than the watermark cannot
move past elements that were not marked as _not
late_ and thus nothing can make them _late_.
> For GBK on-time data simply means the first pane
marked as on time. For state+timers I don't think it
makes sense for Beam to define on-time v.s. late,
rather I think the user can come up with their own
definition depending on their use case. For example,
if you are buffering data into BagState and setting
a timer to process it, it would be logical to say
that any element that was buffered before the timer
expired is on time, and any data that showed up
after the timer fired is late. This would roughly
correspond to what GBK does, and the answer would be
very similar to simply comparing against the
watermark (as the timers fire when the watermark
advances).
Yes, I'd say that stateful DoFns don't have (well
defined) concept of pane, because that is related to
concept of trigger and this is a concept of GBK (or
windowed operations in general). The only semantic
meaning of window in stateful DoFn is that it
"scopes" state.
This discussion might have got a little off the
original question, so I'll try to rephrase it:
Should stateful DoFn drop *all* late data, not just
data that arrive after window boundary + allowed
lateness? Some arguments why I think it should:
* in windowed operations (GBK), it is correct to
drop data on window boundaries only, because time
(as seen by user) effectively hops only on these
discrete time points
* in stateful dofn on the other hand time move
"smoothly" (yes, with some granularity, millisecond,
nanosecond, whatever and with watermark updates
only, but still)
* this could be viewed that dropping late data
immediately as time (again, from user perspective)
moves (not on some more or less artificial boundary
having only little semantic meaning) is consistent
with both the above properties
The negative side effect of this would be, that more
data could be dropped, but ... isn't this what
defines allowed lateness? I don't want to discuss
the implications on user pipelines of such a change
(and if we can or cannot do it), just trying to
build some theoretical understanding of the problem
as a whole. The decision if any change could /
should be made can be done afterwards.
Thanks,
Jan
On 1/4/20 10:35 PM, Reuven Lax wrote:
On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
> Yes, but invariants should hold. If I add a
ParDo that drops late elements (or, more
commonly,diverts the late elements to a
different PCollection), then the result of that
ParDo should _never_ introduce and more late
data. This cannot be guaranteed simply with
watermark checks. The ParDo may decide that the
element was not late, but by the time it
outputs the element the watermark may have
advanced, causing the element to actually be late.
This is actually very interesting. The question
is - if I decide about lateness based on output
watermark of a PTransform, is it still the
case, that in downstream operator(s) the
element could be changed from "not late" to
"late"? Provided the output watermark is
updated synchronously based on input data
(which should be) and watermark update cannot
"overtake" elements, I think that the
downstream decision should not be changed, so
the invariant should hold. Or am I missing
something?
Generally the watermark update can overtake
elements, because runners can explicitly ignore
late data in the watermark calculation (for good
reason - those elements are already late, so no
need to hold up the watermark advancing any more).
For GBK on-time data simply means the first pane
marked as on time. For state+timers I don't think
it makes sense for Beam to define on-time v.s.
late, rather I think the user can come up with
their own definition depending on their use case.
For example, if you are buffering data into
BagState and setting a timer to process it, it
would be logical to say that any element that was
buffered before the timer expired is on time, and
any data that showed up after the timer fired is
late. This would roughly correspond to what GBK
does, and the answer would be very similar to
simply comparing against the watermark (as the
timers fire when the watermark advances).
Reuven
On 1/4/20 8:11 PM, Reuven Lax wrote:
On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
On 1/4/20 6:14 PM, Reuven Lax wrote:
There is a very good reason not to define
lateness directly in terms of the
watermark. The model does not make any
guarantees that the watermark advances
synchronously, and in fact for the
Dataflow runner the watermark advances
asynchronously (i.e. independent of
element processing). This means that
simply comparing an element timestamp
against the watermark creates a race
condition. There are cases where the
answer could change depending on exactly
when you examine the watermark, and if
you examine again while processing the
same bundle you might come to a different
conclusion about lateness.
Due to monotonicity of watermark, I don't
think that the asynchronous updates of
watermark can change the answer from
"late" to "not late". That seems fine to me.
It's the other way around. You check to see
whether an element is late and the answer is
"not late." An instant later the answer
changes to "late" This does cause many
problems, and is why this was changed.
This non determinism is undesirable when
considering lateness, as it can break
many invariants that users may rely on
(e.g. if I could write a ParDo that
filtered all late data, yet still find
late data showing up downstream of the
ParDo which would be very surprising).
For that reason, the SDK always marks
things as late based on deterministic
signals. e.g. for a triggered GBK
everything in the first post-watermark
pane is marked as on time (no matter what
the watermark is) and everything in
subsequent panes is marked as late.
Dropping latecomers will always be
non-deterministic, that is certain. This
is true even in case where watermark is
updated synchronously with element
processing, due to shuffling and varying
(random) differences of processing and
event time in upstream operator(s). The
question was only if a latecomer should be
dropped only at a window boundaries only
(which is a sort of artificial time
boundary), or right away when spotted (in
stateful dofns only). Another question
would be if latecomers should be dropped
based on input or output watermark,
dropping based on output watermark seems
even to be stable in the sense, that all
downstream operators should come to the
same conclusion (this is a bit of a
speculation).
Yes, but invariants should hold. If I add a
ParDo that drops late elements (or, more
commonly,diverts the late elements to a
different PCollection), then the result of
that ParDo should _never_ introduce and more
late data. This cannot be guaranteed simply
with watermark checks. The ParDo may decide
that the element was not late, but by the time
it outputs the element the watermark may have
advanced, causing the element to actually be late.
In practice this is important. And early
version of Dataflow (pre Beam) implemented
lateness by comparing against the watermark,
and it caused no end of trouble for users.
FYI - this is also the reason why Beam
does not currently provide users direct
access to the watermark. The asynchronous
nature of it can be very confusing, and
often results in users writing bugs in
their pipelines. We decided instead to
expose easier-to-reason-about signals
such as timers (triggered by the
watermark), windows, and lateness.
Reuven
On Sat, Jan 4, 2020 at 1:15 AM Jan
Lukavský <[email protected]
<mailto:[email protected]>> wrote:
I realized the problem. I
misinterpreted the
LateDataDroppingDoFnRunner. It
doesn't drop *all* late (arriving
after watermark - allowed lateness)
data, but only data, that arrive
after maxTimestamp + allowedLateness
of their respective windows.
Stateful DoFn can run on global
window (which was the case of my
tests) and there is no dropping then.
Two questions arise then:
a) does it mean that this is one
more argument to move this logic to
StatefulDoFnRunner?
StatefulDoFnRunner performs state
cleanup on window GC time, so without
LateDataDroppingDoFnRunner and late
data will see empty state and will
produce wrong results.
b) is this behavior generally
intentional and correct? Windows and
triggers are (in my point of view)
features of GBK, not stateful DoFn.
Stateful DoFn is a low level
primitive, which can be viewed to
operate on "instant" windows, which
should then probably be defined as
dropping every single element arrive
after allowed lateness. This might
probably relate to question if
operations should be built bottom up
from most primitive and generic ones
to more specific ones - that is GBK
be implemented on top of stateful
DoFn and not vice versa.
Thoughts?
Jan
On 1/4/20 1:03 AM, Steve Niemitz wrote:
I do agree that the direct runner
doesn't drop late data arriving at a
stateful DoFn (I just tested as well).
However, I believe this is
consistent with other runners. I'm
fairly certain (at least last time I
checked) that at least Dataflow will
also only drop late data at GBK
operations, and NOT stateful DoFns.
Whether or not this is intentional
is debatable however, without being
able to inspect the watermark inside
the stateful DoFn, it'd be very
difficult to do anything useful with
late data.
On Fri, Jan 3, 2020 at 5:47 PM Jan
Lukavský <[email protected]
<mailto:[email protected]>> wrote:
I did write a test that tested
if data is dropped in a plain
stateful DoFn. I did this as
part of validating that PR [1]
didn't drop more data when using
@RequiresTimeSortedInput than it
would without this annotation.
This test failed and I didn't
commit it, yet.
The test was basically as follows:
- use TestStream to generate
three elements with timestamps
2, 1 and 0
- between elements with
timestamp 1 and 0 move watermark
to 1
- use allowed lateness of zero
- use stateful dofn that just
emits arbitrary data for each
input element
- use Count.globally to count
outputs
The outcome was that stateful
dofn using
@RequiresTimeSortedInput output
2 elements, without the
annotation it was 3 elements. I
think the correct one would be 2
elements in this case. The
difference is caused by the
annotation having (currently)
its own logic for dropping data,
which could be removed if we
agree, that the data should be
dropped in all cases.
On 1/3/20 11:23 PM, Kenneth
Knowles wrote:
Did you write such
a @Category(ValidatesRunner.class)
test? I believe the Java
direct runner does drop late
data, for both GBK and stateful
ParDo.
Stateful ParDo is implemented
on top of GBK:
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172
And GroupByKey, via
DirectGroupByKey, via
DirectGroupAlsoByWindow, does
drop late data:
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220
I'm not sure why it has its own
code, since ReduceFnRunner also
drops late data, and it does
use ReduceFnRunner (the same
code path all Java-based
runners use).
Kenn
On Fri, Jan 3, 2020 at 1:02 PM
Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Yes, the non-reliability of
late data dropping in
distributed runner is
understood. But this is
even where DirectRunner can
play its role, because only
there it is actually
possible to emulate and
test specific watermark
conditions. Question
regarding this for the java
DirectRunner - should we
completely drop
LataDataDroppingDoFnRunner
and delegate the late data
dropping to
StatefulDoFnRunner? Seems
logical to me, as if we
agree that late data should
always be dropped, then
there would no "valid" use
of StatefulDoFnRunner
without the late data
dropping functionality.
On 1/3/20 9:32 PM, Robert
Bradshaw wrote:
I agree, in fact we just
recently enabled late data
dropping to the direct
runner in Python to be
able to develop better
tests for Dataflow.
It should be noted,
however, that in a
distributed runner (absent
the quiessence of
TestStream) that one can't
*count* on late data being
dropped at a certain
point, and in fact (due to
delays in fully
propagating the watermark)
late data can even become
on-time, so the promises
about what happens behind
the watermark are
necessarily a bit loose.
On Fri, Jan 3, 2020 at
9:15 AM Luke Cwik
<[email protected]
<mailto:[email protected]>>
wrote:
I agree that the
DirectRunner should
drop late data. Late
data dropping is
optional but the
DirectRunner is used
by many for testing
and we should have the
same behaviour they
would get on other
runners or users may
be surprised.
On Fri, Jan 3, 2020 at
3:33 AM Jan Lukavský
<[email protected]
<mailto:[email protected]>>
wrote:
Hi,
I just found out
that DirectRunner
is apparently not
using
LateDataDroppingDoFnRunner,
which means that
it doesn't drop
late data
in cases where
there is no GBK
operation involved
(dropping in GBK
seems
to be correct).
There is
apparently no
@Category(ValidatesRunner)
test
for that behavior
(because
DirectRunner would
fail it), so the
question
is - should late
data dropping be
considered part of
model (of which
DirectRunner
should be a
canonical
implementation)
and therefore that
should be fixed
there, or is the
late data dropping
an optional feature
of a runner?
I'm strongly in
favor of the first
option, and I
think it is likely
that
all real-world
runners would
probably adhere to
that (I didn't check
that, though).
Opinions?
Jan