[BEAM-2571] Respect watermark contract in Flink DoFnOperator In Flink, a watermark T specifies that there will be no elements with a timestamp <= T in the future. In Beam, a watermark T specifies that there will not be element with a timestamp < T in the future. This leads to problems when the watermark is exactly "on the timer timestamp", most prominently, this happened with Triggers, where Flink would fire the Trigger too early and the Trigger would determine (based on the watermark) that it is not yet time to fire the window while Flink thought it was time.
This also adds a test that specifially tests the edge case. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ade50652 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ade50652 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ade50652 Branch: refs/heads/release-2.1.0 Commit: ade506526b4ff56eb4ed15e9eea04d1d3345bc13 Parents: 5c4a95a Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Jul 12 15:38:06 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Jul 24 14:29:56 2017 +0200 ---------------------------------------------------------------------- .../wrappers/streaming/DoFnOperator.java | 13 ++- .../flink/streaming/DoFnOperatorTest.java | 117 ++++++++++++++++++- 2 files changed, 128 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ade50652/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8da8de5..8884ce1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -472,7 +472,7 @@ public class DoFnOperator<InputT, OutputT> // hold back by the pushed back values waiting for side inputs long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); - timerService.advanceWatermark(pushedBackInputWatermark); + timerService.advanceWatermark(toFlinkRuntimeWatermark(pushedBackInputWatermark)); Instant watermarkHold = stateInternals.watermarkHold(); @@ -501,6 +501,17 @@ public class DoFnOperator<InputT, OutputT> } /** + * Converts a Beam watermark to a Flink watermark. This is only relevant when considering what + * event-time timers to fire: in Beam, a watermark {@code T} says there will not be any elements + * with a timestamp {@code < T} in the future. A Flink watermark {@code T} says there will not be + * any elements with a timestamp {@code <= T} in the future. We correct this by subtracting + * {@code 1} from a Beam watermark before passing to any relevant Flink runtime components. + */ + private static long toFlinkRuntimeWatermark(long beamWatermark) { + return beamWatermark - 1; + } + + /** * Emits all pushed-back data. This should be used once we know that there will not be * any future side input, i.e. that there is no point in waiting. */ http://git-wip-us.apache.org/repos/asf/beam/blob/ade50652/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index ad9d236..4d2a912 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -33,6 +33,7 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -197,6 +198,118 @@ public class DoFnOperatorTest { testHarness.close(); } + /** + * This test specifically verifies that we correctly map Flink watermarks to Beam watermarks. In + * Beam, a watermark {@code T} guarantees there will not be elements with a timestamp + * {@code < T} in the future. In Flink, a watermark {@code T} guarantees there will not be + * elements with a timestamp {@code <= T} in the future. We have to make sure to take this into + * account when firing timers. + * + * <p>This not test the timer API in general or processing-time timers because there are generic + * tests for this in {@code ParDoTest}. + */ + @Test + public void testWatermarkContract() throws Exception { + + final Instant timerTimestamp = new Instant(1000); + final String outputMessage = "Timer fired"; + + WindowingStrategy<Object, IntervalWindow> windowingStrategy = + WindowingStrategy.of(FixedWindows.of(new Duration(10_000))); + + DoFn<Integer, String> fn = new DoFn<Integer, String>() { + private static final String EVENT_TIMER_ID = "eventTimer"; + + @TimerId(EVENT_TIMER_ID) + private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(ProcessContext context, @TimerId(EVENT_TIMER_ID) Timer timer) { + timer.set(timerTimestamp); + } + + @OnTimer(EVENT_TIMER_ID) + public void onEventTime(OnTimerContext context) { + assertEquals( + "Timer timestamp must match set timestamp.", timerTimestamp, context.timestamp()); + context.outputWithTimestamp(outputMessage, context.timestamp()); + } + }; + + WindowedValue.FullWindowedValueCoder<Integer> inputCoder = + WindowedValue.getFullCoder( + VarIntCoder.of(), + windowingStrategy.getWindowFn().windowCoder()); + + WindowedValue.FullWindowedValueCoder<String> outputCoder = + WindowedValue.getFullCoder( + StringUtf8Coder.of(), + windowingStrategy.getWindowFn().windowCoder()); + + + TupleTag<String> outputTag = new TupleTag<>("main-output"); + + DoFnOperator<Integer, String> doFnOperator = new DoFnOperator<>( + fn, + "stepName", + inputCoder, + outputTag, + Collections.<TupleTag<?>>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory<String>(), + windowingStrategy, + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + PipelineOptionsFactory.as(FlinkPipelineOptions.class), + VarIntCoder.of() /* key coder */); + + OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + doFnOperator, + new KeySelector<WindowedValue<Integer>, Integer>() { + @Override + public Integer getKey(WindowedValue<Integer> integerWindowedValue) throws Exception { + return integerWindowedValue.getValue(); + } + }, + new CoderTypeInformation<>(VarIntCoder.of())); + + testHarness.setup(new CoderTypeSerializer<>(outputCoder)); + + testHarness.open(); + + testHarness.processWatermark(0); + + IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10_000)); + + // this should register a timer + testHarness.processElement( + new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING))); + + assertThat( + this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + emptyIterable()); + + // this does not yet fire the timer (in vanilla Flink it would) + testHarness.processWatermark(timerTimestamp.getMillis()); + + assertThat( + this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + emptyIterable()); + + testHarness.getOutput().clear(); + + // this must fire the timer + testHarness.processWatermark(timerTimestamp.getMillis() + 1); + + assertThat( + this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.of( + outputMessage, new Instant(timerTimestamp), window1, PaneInfo.NO_FIRING))); + + testHarness.close(); + } + @Test public void testLateDroppingForStatefulFn() throws Exception { @@ -394,11 +507,13 @@ public class DoFnOperatorTest { // this should trigger both the window.maxTimestamp() timer and the GC timer // this tests that the GC timer fires after the user timer + // we have to add 1 here because Flink timers fire when watermark >= timestamp while Beam + // timers fire when watermark > timestamp testHarness.processWatermark( window1.maxTimestamp() .plus(windowingStrategy.getAllowedLateness()) .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS) - .getMillis()); + .getMillis() + 1); assertThat( this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),