On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
On 1/4/20 6:14 PM, Reuven Lax wrote:
There is a very good reason not to define lateness directly
in terms of the watermark. The model does not make any
guarantees that the watermark advances synchronously, and in
fact for the Dataflow runner the watermark advances
asynchronously (i.e. independent of element processing).
This means that simply comparing an element timestamp
against the watermark creates a race condition. There are
cases where the answer could change depending on exactly
when you examine the watermark, and if you examine again
while processing the same bundle you might come to a
different conclusion about lateness.
Due to monotonicity of watermark, I don't think that the
asynchronous updates of watermark can change the answer from
"late" to "not late". That seems fine to me.
It's the other way around. You check to see whether an element is
late and the answer is "not late." An instant later the answer
changes to "late" This does cause many problems, and is why this
was changed.
This non determinism is undesirable when considering
lateness, as it can break many invariants that users may
rely on (e.g. if I could write a ParDo that filtered all
late data, yet still find late data showing up downstream of
the ParDo which would be very surprising). For that reason,
the SDK always marks things as late based on deterministic
signals. e.g. for a triggered GBK everything in the first
post-watermark pane is marked as on time (no matter what the
watermark is) and everything in subsequent panes is marked
as late.
Dropping latecomers will always be non-deterministic, that is
certain. This is true even in case where watermark is updated
synchronously with element processing, due to shuffling and
varying (random) differences of processing and event time in
upstream operator(s). The question was only if a latecomer
should be dropped only at a window boundaries only (which is
a sort of artificial time boundary), or right away when
spotted (in stateful dofns only). Another question would be
if latecomers should be dropped based on input or output
watermark, dropping based on output watermark seems even to
be stable in the sense, that all downstream operators should
come to the same conclusion (this is a bit of a speculation).
Yes, but invariants should hold. If I add a ParDo that drops late
elements (or, more commonly,diverts the late elements to a
different PCollection), then the result of that ParDo should
_never_ introduce and more late data. This cannot be guaranteed
simply with watermark checks. The ParDo may decide that the
element was not late, but by the time it outputs the element the
watermark may have advanced, causing the element to actually be late.
In practice this is important. And early version of Dataflow (pre
Beam) implemented lateness by comparing against the watermark,
and it caused no end of trouble for users.
FYI - this is also the reason why Beam does not currently
provide users direct access to the watermark. The
asynchronous nature of it can be very confusing, and often
results in users writing bugs in their pipelines. We decided
instead to expose easier-to-reason-about signals such as
timers (triggered by the watermark), windows, and lateness.
Reuven
On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
I realized the problem. I misinterpreted the
LateDataDroppingDoFnRunner. It doesn't drop *all* late
(arriving after watermark - allowed lateness) data, but
only data, that arrive after maxTimestamp +
allowedLateness of their respective windows.
Stateful DoFn can run on global window (which was the
case of my tests) and there is no dropping then.
Two questions arise then:
a) does it mean that this is one more argument to move
this logic to StatefulDoFnRunner? StatefulDoFnRunner
performs state cleanup on window GC time, so without
LateDataDroppingDoFnRunner and late data will see empty
state and will produce wrong results.
b) is this behavior generally intentional and correct?
Windows and triggers are (in my point of view) features
of GBK, not stateful DoFn. Stateful DoFn is a low level
primitive, which can be viewed to operate on "instant"
windows, which should then probably be defined as
dropping every single element arrive after allowed
lateness. This might probably relate to question if
operations should be built bottom up from most primitive
and generic ones to more specific ones - that is GBK be
implemented on top of stateful DoFn and not vice versa.
Thoughts?
Jan
On 1/4/20 1:03 AM, Steve Niemitz wrote:
I do agree that the direct runner doesn't drop late
data arriving at a stateful DoFn (I just tested as well).
However, I believe this is consistent with other
runners. I'm fairly certain (at least last time I
checked) that at least Dataflow will also only drop
late data at GBK operations, and NOT stateful DoFns.
Whether or not this is intentional is debatable
however, without being able to inspect the watermark
inside the stateful DoFn, it'd be very difficult to do
anything useful with late data.
On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
I did write a test that tested if data is dropped
in a plain stateful DoFn. I did this as part of
validating that PR [1] didn't drop more data when
using @RequiresTimeSortedInput than it would
without this annotation. This test failed and I
didn't commit it, yet.
The test was basically as follows:
- use TestStream to generate three elements with
timestamps 2, 1 and 0
- between elements with timestamp 1 and 0 move
watermark to 1
- use allowed lateness of zero
- use stateful dofn that just emits arbitrary data
for each input element
- use Count.globally to count outputs
The outcome was that stateful dofn using
@RequiresTimeSortedInput output 2 elements, without
the annotation it was 3 elements. I think the
correct one would be 2 elements in this case. The
difference is caused by the annotation having
(currently) its own logic for dropping data, which
could be removed if we agree, that the data should
be dropped in all cases.
On 1/3/20 11:23 PM, Kenneth Knowles wrote:
Did you write such
a @Category(ValidatesRunner.class) test? I believe
the Java direct runner does drop late data, for
both GBK and stateful ParDo.
Stateful ParDo is implemented on top of GBK:
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172
And GroupByKey, via DirectGroupByKey, via
DirectGroupAlsoByWindow, does drop late data:
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220
I'm not sure why it has its own code, since
ReduceFnRunner also drops late data, and it does
use ReduceFnRunner (the same code path all
Java-based runners use).
Kenn
On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Yes, the non-reliability of late data dropping
in distributed runner is understood. But this
is even where DirectRunner can play its role,
because only there it is actually possible to
emulate and test specific watermark
conditions. Question regarding this for the
java DirectRunner - should we completely drop
LataDataDroppingDoFnRunner and delegate the
late data dropping to StatefulDoFnRunner?
Seems logical to me, as if we agree that late
data should always be dropped, then there
would no "valid" use of StatefulDoFnRunner
without the late data dropping functionality.
On 1/3/20 9:32 PM, Robert Bradshaw wrote:
I agree, in fact we just recently enabled
late data dropping to the direct runner in
Python to be able to develop better tests for
Dataflow.
It should be noted, however, that in a
distributed runner (absent the quiessence of
TestStream) that one can't *count* on late
data being dropped at a certain point, and in
fact (due to delays in fully propagating the
watermark) late data can even become on-time,
so the promises about what happens behind the
watermark are necessarily a bit loose.
On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik
<[email protected] <mailto:[email protected]>>
wrote:
I agree that the DirectRunner should drop
late data. Late data dropping is optional
but the DirectRunner is used by many for
testing and we should have the same
behaviour they would get on other runners
or users may be surprised.
On Fri, Jan 3, 2020 at 3:33 AM Jan
Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi,
I just found out that DirectRunner is
apparently not using
LateDataDroppingDoFnRunner, which
means that it doesn't drop late data
in cases where there is no GBK
operation involved (dropping in GBK
seems
to be correct). There is apparently
no @Category(ValidatesRunner) test
for that behavior (because
DirectRunner would fail it), so the
question
is - should late data dropping be
considered part of model (of which
DirectRunner should be a canonical
implementation) and therefore that
should be fixed there, or is the late
data dropping an optional feature
of a runner?
I'm strongly in favor of the first
option, and I think it is likely that
all real-world runners would probably
adhere to that (I didn't check
that, though).
Opinions?
Jan