Hi,

what seems the most "surprising" to me is that we are using TimestampCombiners to actually do two (orthogonal) things:

 a) calculate a watermark hold for a window, so on-time elements emitted from a pane are not late in downstream processing

 b) calculate timestamp of elements in output pane

These two follow a little different constraints - while in case a) it is not allowed to shift watermark "back in time" in case b) it seems OK to output data with timestamp lower than output watermark (what comes late, might leave late). So, while it seems OK to discard late elements for the sake of calculation output watermark, it seems wrong to discard them when calculating output timestamp. Maybe these two timestamps might be held in different states (the state will be held until GC time for accumulating panes and reset on discarding panes)?

Jan

On 5/28/20 5:02 PM, David Morávek wrote:
Hi,

I've came across "unexpected" model behaviour when dealing with late data and custom timestamp combiners. Let's take a following pipeline as an example:

final PCollection<String> input = ...;
input.apply(
      "GlobalWindows",
      Window.<String>into(new GlobalWindows())
          .triggering(
              AfterWatermark.pastEndOfWindow()
                  .withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))))
          .withTimestampCombiner(TimestampCombiner.LATEST)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
          .accumulatingFiredPanes())
  .apply("Aggregate", Count.perElement())

The above pipeline emits updates with the latest input timestamp it has seen so far (from non-late elements). We write the output from this timestamp to kafka and read it from another pipeline.

Problem comes when we need to handle late elements behind output watermark. In this case beam can not use combined timestamp and uses EOW timestamp instead. Unfortunately this results in downstream pipeline progressing it's input watermark to end of global window. Also if we would use fixed windows after this aggregation, it would yield unexpected results.

There is no reasoning about this behaviour in the last section of lateness design doc <https://s.apache.org/beam-lateness> [1], so I'd like to open a discussion about what the expected result should be.

My personal opinion is, that correct approach would be emitting late elements with currentOutputWatermark rather than EOW in case of EARLIEST and LATEST timestamp combiners.

I've prepared a faling test case for ReduceFnRunner <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>, if anyone wants to play around with the issue.

I also think that BEAM-2262 <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related to this discussion.

[1] https://s.apache.org/beam-lateness
[2] https://issues.apache.org/jira/browse/BEAM-2262
[3] https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b

Looking forward to hearing your thoughts.

Thanks,
D.

Reply via email to