xinyuiscool commented on a change in pull request #1355:
URL: https://github.com/apache/samza/pull/1355#discussion_r421798671



##########
File path: 
samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
##########
@@ -57,9 +58,33 @@ private EpochTimeScheduler(ScheduledExecutorService 
executor) {
     this.executor = executor;
   }
 
+  @VisibleForTesting
+  Map<Object, ScheduledFuture> getScheduledFutures() {
+    return scheduledFutures;
+  }
+
   public <K> void setTimer(K key, long timestamp, ScheduledCallback<K> 
callback) {
-    checkState(!scheduledFutures.containsKey(key),
-        String.format("Duplicate key %s registration for the same timer", 
key));
+    if (scheduledFutures.containsKey(key)) {
+      LOG.warn("Registering duplicate callback for key: {}. Attempting to 
cancel the previous callback", key);
+      ScheduledFuture<?> scheduledFuture = scheduledFutures.get(key);
+
+      /*
+       * We can have a race between the time we check for the presence of the 
key and the time we attempt to cancel;
+       * Hence we check for non-null criteria to ensure the executor hasn't 
kicked off the callback for the key which
+       * removes the future from the map before invoking onTimer.
+       *  1. In the event that callback is running then we will not attempt to 
interrupt the action and
+       *     cancel will return as unsuccessful.
+       *  2. In case of the callback successfully executed, we want to allow 
duplicate registration to keep the
+       *     behavior consistent with the scenario where the callback is 
already executed or in progress even before
+       *     we entered this condition.
+       */
+      if (scheduledFuture != null
+          && !scheduledFuture.cancel(false)
+          && !scheduledFuture.isDone()) {
+        LOG.debug("Failed to cancel the previous callback successfully. 
Ignoring the current request to register new callback");

Review comment:
       Probably need this to be warn level logging here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to