[BEAM-2571] Change DoFnOperator to use Long.MAX_VALUE as max watermark

This is in line with what Flink does and what BoundedSourceWrapper and
UnboundedSourceWrapper do.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84499317
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84499317
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84499317

Branch: refs/heads/master
Commit: 8449931708338dd854ba90b8d6f769fe42b81493
Parents: 5f1d136
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Jul 12 14:42:37 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Jul 24 09:47:44 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/streaming/DoFnOperator.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/84499317/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index a80f7b6..b1f3b86 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -334,7 +334,7 @@ public class DoFnOperator<InputT, OutputT>
   protected final long getPushbackWatermarkHold() {
     // if we don't have side inputs we never hold the watermark
     if (sideInputs.isEmpty()) {
-      return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+      return Long.MAX_VALUE;
     }
 
     try {
@@ -353,7 +353,7 @@ public class DoFnOperator<InputT, OutputT>
       BagState<WindowedValue<InputT>> pushedBack =
           pushbackStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
 
-      long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+      long min = Long.MAX_VALUE;
       for (WindowedValue<InputT> value : pushedBack.read()) {
         min = Math.min(min, value.getTimestamp().getMillis());
       }
@@ -426,7 +426,7 @@ public class DoFnOperator<InputT, OutputT>
     }
 
     pushedBack.clear();
-    long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+    long min = Long.MAX_VALUE;
     for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
       min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
       pushedBack.add(pushedBackValue);
@@ -524,7 +524,7 @@ public class DoFnOperator<InputT, OutputT>
 
     pushedBack.clear();
 
-    setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+    setPushedBackWatermark(Long.MAX_VALUE);
 
     pushbackDoFnRunner.finishBundle();
   }

Reply via email to