Repository: beam Updated Branches: refs/heads/master b40b26501 -> c2c89eda9
Use processing time for synchronized processing time in Flink runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/05722acc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/05722acc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/05722acc Branch: refs/heads/master Commit: 05722accd3da1a92af9f73e85c7ae78742cd9db1 Parents: ae72456 Author: Kenneth Knowles <k...@google.com> Authored: Tue May 2 09:03:03 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue May 2 09:03:05 2017 -0700 ---------------------------------------------------------------------- .../wrappers/streaming/DoFnOperator.java | 36 ++++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/05722acc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 54eb770..c130200 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -720,13 +720,17 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> @Override public void setTimer(TimerData timerKey) { long time = timerKey.getTimestamp().getMillis(); - if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { - timerService.registerEventTimeTimer(timerKey, time); - } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { - timerService.registerProcessingTimeTimer(timerKey, time); - } else { - throw new UnsupportedOperationException( - "Unsupported time domain: " + timerKey.getDomain()); + switch (timerKey.getDomain()) { + case EVENT_TIME: + timerService.registerEventTimeTimer(timerKey, time); + break; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + timerService.registerProcessingTimeTimer(timerKey, time); + break; + default: + throw new UnsupportedOperationException( + "Unsupported time domain: " + timerKey.getDomain()); } } @@ -747,13 +751,17 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> @Override public void deleteTimer(TimerData timerKey) { long time = timerKey.getTimestamp().getMillis(); - if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { - timerService.deleteEventTimeTimer(timerKey, time); - } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { - timerService.deleteProcessingTimeTimer(timerKey, time); - } else { - throw new UnsupportedOperationException( - "Unsupported time domain: " + timerKey.getDomain()); + switch (timerKey.getDomain()) { + case EVENT_TIME: + timerService.deleteEventTimeTimer(timerKey, time); + break; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + timerService.deleteProcessingTimeTimer(timerKey, time); + break; + default: + throw new UnsupportedOperationException( + "Unsupported time domain: " + timerKey.getDomain()); } }