Hi,
I've came across "unexpected" model behaviour when dealing with late data
and custom timestamp combiners. Let's take a following pipeline as an
example:
final PCollection<String> input = ...;
input.apply(
"GlobalWindows",
Window.<String>into(new GlobalWindows())
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))))
.withTimestampCombiner(TimestampCombiner.LATEST)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
.accumulatingFiredPanes())
.apply("Aggregate", Count.perElement())
The above pipeline emits updates with the latest input timestamp it has
seen so far (from non-late elements). We write the output from this
timestamp to kafka and read it from another pipeline.
Problem comes when we need to handle late elements behind output watermark.
In this case beam can not use combined timestamp and uses EOW timestamp
instead. Unfortunately this results in downstream pipeline progressing it's
input watermark to end of global window. Also if we would use fixed windows
after this aggregation, it would yield unexpected results.
There is no reasoning about this behaviour in the last section of lateness
design doc <https://s.apache.org/beam-lateness> [1], so I'd like to open a
discussion about what the expected result should be.
My personal opinion is, that correct approach would be emitting late
elements with currentOutputWatermark rather than EOW in case of EARLIEST
and LATEST timestamp combiners.
I've prepared a faling test case for ReduceFnRunner
<https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
if anyone wants to play around with the issue.
I also think that BEAM-2262
<https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related to
this discussion.
[1] https://s.apache.org/beam-lateness
[2] https://issues.apache.org/jira/browse/BEAM-2262
[3]
https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
Looking forward to hearing your thoughts.
Thanks,
D.