[BEAM-2571] Change DoFnOperator to use Long.MAX_VALUE as max watermark This is in line with what Flink does and what BoundedSourceWrapper and UnboundedSourceWrapper do.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84499317 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84499317 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84499317 Branch: refs/heads/master Commit: 8449931708338dd854ba90b8d6f769fe42b81493 Parents: 5f1d136 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Jul 12 14:42:37 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Jul 24 09:47:44 2017 +0200 ---------------------------------------------------------------------- .../flink/translation/wrappers/streaming/DoFnOperator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/84499317/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index a80f7b6..b1f3b86 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -334,7 +334,7 @@ public class DoFnOperator<InputT, OutputT> protected final long getPushbackWatermarkHold() { // if we don't have side inputs we never hold the watermark if (sideInputs.isEmpty()) { - return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + return Long.MAX_VALUE; } try { @@ -353,7 +353,7 @@ public class DoFnOperator<InputT, OutputT> BagState<WindowedValue<InputT>> pushedBack = pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + long min = Long.MAX_VALUE; for (WindowedValue<InputT> value : pushedBack.read()) { min = Math.min(min, value.getTimestamp().getMillis()); } @@ -426,7 +426,7 @@ public class DoFnOperator<InputT, OutputT> } pushedBack.clear(); - long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + long min = Long.MAX_VALUE; for (WindowedValue<InputT> pushedBackValue : newPushedBack) { min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); pushedBack.add(pushedBackValue); @@ -524,7 +524,7 @@ public class DoFnOperator<InputT, OutputT> pushedBack.clear(); - setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + setPushedBackWatermark(Long.MAX_VALUE); pushbackDoFnRunner.finishBundle(); }