This closes #1160: Support set and delete of timer by ID in 
InMemoryTimerInternals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7ee8c86d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7ee8c86d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7ee8c86d

Branch: refs/heads/python-sdk
Commit: 7ee8c86d3b0553d8cb7de60b0dc1a03103dfbbc5
Parents: a9447a2 df2e540
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Dec 21 11:02:02 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Dec 21 11:02:02 2016 -0800

----------------------------------------------------------------------
 .../runners/core/InMemoryTimerInternals.java    |  65 +++++++----
 .../core/InMemoryTimerInternalsTest.java        | 112 +++++++++++++------
 2 files changed, 120 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ee8c86d/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --cc 
runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 5ddd5a7,292ac23..2c3d78a
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@@ -104,10 -106,9 +106,10 @@@ public class InMemoryTimerInternals imp
    @Override
    public void setTimer(StateNamespace namespace, String timerId, Instant 
target,
        TimeDomain timeDomain) {
-     throw new UnsupportedOperationException("Setting a timer by ID is not yet 
supported.");
+     setTimer(TimerData.of(timerId, namespace, target, timeDomain));
    }
  
 +  @Deprecated
    @Override
    public void setTimer(TimerData timerData) {
      WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), 
timerData);
@@@ -117,17 -133,13 +134,20 @@@
    }
  
    @Override
 +  public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
 +    throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
 +  }
 +
 +  @Deprecated
 +  @Override
    public void deleteTimer(StateNamespace namespace, String timerId) {
-     throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
+     TimerData existing = existingTimers.get(namespace, timerId);
+     if (existing != null) {
+       deleteTimer(existing);
+     }
    }
  
 +  @Deprecated
    @Override
    public void deleteTimer(TimerData timer) {
      WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), 
timer);

Reply via email to