dajac commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1255503145


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) {
         abstract boolean canTransitionFrom(CoordinatorState state);
     }
 
+    /**
+     * The EventBasedCoordinatorTimer implements the CoordinatorTimer 
interface and provides an event based
+     * timer which turns timeouts of a regular {@link Timer} into {@link 
CoordinatorWriteEvent} events which
+     * are executed by the {@link CoordinatorEventProcessor} used by this 
coordinator runtime. This is done
+     * to ensure that the timer respects the threading model of the 
coordinator runtime.
+     *
+     * The {@link CoordinatorWriteEvent} events pushed by the coordinator 
timer wraps the
+     * {@link TimeoutOperation} operations scheduled by the coordinators.
+     *
+     * It also keeps track of all the scheduled {@link TimerTask}. This allows 
timeout operations to be
+     * cancelled or rescheduled. When a timer is cancelled or overridden, the 
previous timer is guaranteed to
+     * not be executed even if it already expired and got pushed to the event 
processor.
+     *
+     * When a timer fails with an unexpected exception, the timer is 
rescheduled with a backoff.
+     */
+    class EventBasedCoordinatorTimer implements CoordinatorTimer<U> {
+        /**
+         * The logger.
+         */
+        final Logger log;
+
+        /**
+         * The topic partition.
+         */
+        final TopicPartition tp;
+
+        /**
+         * The scheduled timers keyed by their key.
+         */
+        final Map<String, TimerTask> tasks = new HashMap<>();
+
+        EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+            this.tp = tp;
+            this.log = logContext.logger(EventBasedCoordinatorTimer.class);
+        }

Review Comment:
   If you look carefully at the `schedule` method, there is the following: 
   
   ```
               log.debug("Registering timer {} with delay of {}ms.", key, 
unit.toMillis(delay));
               TimerTask prevTask = tasks.put(key, task);
               if (prevTask != null) prevTask.cancel();
   
               timer.add(task);
   ```
   
   So the task is added to the Timer object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to