Repository: incubator-beam Updated Branches: refs/heads/master 0d0a5e287 -> 4843dc59c
Require TimeDomain to delete a timer Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35a02740 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35a02740 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35a02740 Branch: refs/heads/master Commit: 35a02740748182ee52729d8bfb621a3c342b8312 Parents: 0d0a5e2 Author: Kenneth Knowles <k...@google.com> Authored: Tue Dec 20 20:09:25 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Dec 21 08:20:28 2016 -0800 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 8 ++++++++ .../beam/runners/core/InMemoryTimerInternals.java | 8 ++++++++ .../beam/runners/direct/DirectTimerInternals.java | 8 ++++++++ .../wrappers/streaming/WindowDoFnOperator.java | 9 +++++++++ .../org/apache/beam/sdk/util/TimerInternals.java | 17 +++++++++++++++-- 5 files changed, 48 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 48ac177..49ec1c8 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -425,12 +425,19 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { */ public class ApexTimerInternals implements TimerInternals { + @Deprecated @Override public void setTimer(TimerData timerData) { registerActiveTimer(context.element().key(), timerData); } @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported."); + } + + @Deprecated + @Override public void deleteTimer(TimerData timerKey) { unregisterActiveTimer(context.element().key(), timerKey); } @@ -463,6 +470,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { throw new UnsupportedOperationException("Setting timer by ID not yet supported."); } + @Deprecated @Override public void deleteTimer(StateNamespace namespace, String timerId) { throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported."); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index 5fcd088..5ddd5a7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -107,6 +107,7 @@ public class InMemoryTimerInternals implements TimerInternals { throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); } + @Deprecated @Override public void setTimer(TimerData timerData) { WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData); @@ -116,10 +117,17 @@ public class InMemoryTimerInternals implements TimerInternals { } @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); + } + + @Deprecated + @Override public void deleteTimer(StateNamespace namespace, String timerId) { throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); } + @Deprecated @Override public void deleteTimer(TimerData timer) { WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 8970b4b..5ca276d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -52,16 +52,24 @@ class DirectTimerInternals implements TimerInternals { throw new UnsupportedOperationException("Setting timer by ID not yet supported."); } + @Deprecated @Override public void setTimer(TimerData timerData) { timerUpdateBuilder.setTimer(timerData); } @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported."); + } + + @Deprecated + @Override public void deleteTimer(StateNamespace namespace, String timerId) { throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported."); } + @Deprecated @Override public void deleteTimer(TimerData timerKey) { timerUpdateBuilder.deletedTimer(timerKey); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 9cea529..5398d7b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -455,6 +455,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); } + @Deprecated @Override public void setTimer(TimerData timerKey) { if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { @@ -468,11 +469,19 @@ public class WindowDoFnOperator<K, InputT, OutputT> } @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException( + "Canceling of a timer by ID is not yet supported."); + } + + @Deprecated + @Override public void deleteTimer(StateNamespace namespace, String timerId) { throw new UnsupportedOperationException( "Canceling of a timer by ID is not yet supported."); } + @Deprecated @Override public void deleteTimer(TimerData timerKey) { if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java index c3e498e..0bfcddc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; @@ -61,18 +62,30 @@ public interface TimerInternals { void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain); /** - * Sets the timer described by {@code timerData}. + * @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}. */ + @Deprecated void setTimer(TimerData timerData); /** * Deletes the given timer. + * + * <p>A timer's ID is enforced to be unique in validation of a {@link DoFn}, but runners + * often manage timers for different time domains in very different ways, thus the + * {@link TimeDomain} is a required parameter. + */ + void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain); + + /** + * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */ + @Deprecated void deleteTimer(StateNamespace namespace, String timerId); /** - * Deletes the timer with the ID contained in the provided {@link TimerData}. + * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */ + @Deprecated void deleteTimer(TimerData timerKey); /**