Repository: beam
Updated Branches:
  refs/heads/master b40b26501 -> c2c89eda9


Use processing time for synchronized processing time in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/05722acc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/05722acc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/05722acc

Branch: refs/heads/master
Commit: 05722accd3da1a92af9f73e85c7ae78742cd9db1
Parents: ae72456
Author: Kenneth Knowles <k...@google.com>
Authored: Tue May 2 09:03:03 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue May 2 09:03:05 2017 -0700

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java        | 36 ++++++++++++--------
 1 file changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/05722acc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 54eb770..c130200 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -720,13 +720,17 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     @Override
     public void setTimer(TimerData timerKey) {
       long time = timerKey.getTimestamp().getMillis();
-      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
-        timerService.registerEventTimeTimer(timerKey, time);
-      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
-        timerService.registerProcessingTimeTimer(timerKey, time);
-      } else {
-        throw new UnsupportedOperationException(
-            "Unsupported time domain: " + timerKey.getDomain());
+      switch (timerKey.getDomain()) {
+        case EVENT_TIME:
+          timerService.registerEventTimeTimer(timerKey, time);
+          break;
+        case PROCESSING_TIME:
+        case SYNCHRONIZED_PROCESSING_TIME:
+          timerService.registerProcessingTimeTimer(timerKey, time);
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported time domain: " + timerKey.getDomain());
       }
     }
 
@@ -747,13 +751,17 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     @Override
     public void deleteTimer(TimerData timerKey) {
       long time = timerKey.getTimestamp().getMillis();
-      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
-        timerService.deleteEventTimeTimer(timerKey, time);
-      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
-        timerService.deleteProcessingTimeTimer(timerKey, time);
-      } else {
-        throw new UnsupportedOperationException(
-            "Unsupported time domain: " + timerKey.getDomain());
+      switch (timerKey.getDomain()) {
+        case EVENT_TIME:
+          timerService.deleteEventTimeTimer(timerKey, time);
+          break;
+        case PROCESSING_TIME:
+        case SYNCHRONIZED_PROCESSING_TIME:
+          timerService.deleteProcessingTimeTimer(timerKey, time);
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported time domain: " + timerKey.getDomain());
       }
     }
 

Reply via email to