dajac commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1256105702
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } + /** + * The EventBasedCoordinatorTimer implements the CoordinatorTimer interface and provides an event based + * timer which turns timeouts of a regular {@link Timer} into {@link CoordinatorWriteEvent} events which + * are executed by the {@link CoordinatorEventProcessor} used by this coordinator runtime. This is done + * to ensure that the timer respects the threading model of the coordinator runtime. + * + * The {@link CoordinatorWriteEvent} events pushed by the coordinator timer wraps the + * {@link TimeoutOperation} operations scheduled by the coordinators. + * + * It also keeps track of all the scheduled {@link TimerTask}. This allows timeout operations to be + * cancelled or rescheduled. When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got pushed to the event processor. + * + * When a timer fails with an unexpected exception, the timer is rescheduled with a backoff. + */ + class EventBasedCoordinatorTimer implements CoordinatorTimer<U> { + /** + * The logger. + */ + final Logger log; + + /** + * The topic partition. + */ + final TopicPartition tp; + + /** + * The scheduled timers keyed by their key. + */ + final Map<String, TimerTask> tasks = new HashMap<>(); + + EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) { + this.tp = tp; + this.log = logContext.logger(EventBasedCoordinatorTimer.class); + } + + @Override + public void schedule( + String key, + long delay, + TimeUnit unit, + TimeoutOperation<U> operation + ) { + // The TimerTask wraps the TimeoutOperation into a CoordinatorWriteEvent. When the TimerTask + // expires, the event is push to the queue of the coordinator runtime to be executed. This + // ensure that the threading model of the runtime is respected. + TimerTask task = new TimerTask(unit.toMillis(delay)) { + @Override + public void run() { + String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; + CoordinatorWriteEvent<Void> event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { + log.debug("Executing write event {} for timer {}.", eventName, key); + + // If the task is different, it means that the timer has been + // cancelled while the event was waiting to be processed. + if (!tasks.remove(key, this)) { + throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); + } + + // Execute the timeout operation. + return new CoordinatorResult<>(operation.generateRecords(), null); + }); + + // If the write event fails, it is rescheduled with a small backoff except if the + // error is fatal. + event.future.exceptionally(ex -> { + if (ex instanceof RejectedExecutionException) { + log.debug("The delayed write event {} for the timer {} was not executed because it was " + Review Comment: Actually, no, we don’t because we expect the timer to be rescheduled before it expires and pushes the event to the queue. We would only see it when the timer expires and the event get pushed to the queue and the timer is rescheduled before the event is actually executed. This could for instance happen if the timeout event get right behind a heartbeat event in the queue. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } + /** + * The EventBasedCoordinatorTimer implements the CoordinatorTimer interface and provides an event based + * timer which turns timeouts of a regular {@link Timer} into {@link CoordinatorWriteEvent} events which + * are executed by the {@link CoordinatorEventProcessor} used by this coordinator runtime. This is done + * to ensure that the timer respects the threading model of the coordinator runtime. + * + * The {@link CoordinatorWriteEvent} events pushed by the coordinator timer wraps the + * {@link TimeoutOperation} operations scheduled by the coordinators. + * + * It also keeps track of all the scheduled {@link TimerTask}. This allows timeout operations to be + * cancelled or rescheduled. When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got pushed to the event processor. + * + * When a timer fails with an unexpected exception, the timer is rescheduled with a backoff. + */ + class EventBasedCoordinatorTimer implements CoordinatorTimer<U> { + /** + * The logger. + */ + final Logger log; + + /** + * The topic partition. + */ + final TopicPartition tp; + + /** + * The scheduled timers keyed by their key. + */ + final Map<String, TimerTask> tasks = new HashMap<>(); + + EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) { + this.tp = tp; + this.log = logContext.logger(EventBasedCoordinatorTimer.class); + } + + @Override + public void schedule( + String key, + long delay, + TimeUnit unit, + TimeoutOperation<U> operation + ) { + // The TimerTask wraps the TimeoutOperation into a CoordinatorWriteEvent. When the TimerTask + // expires, the event is push to the queue of the coordinator runtime to be executed. This + // ensure that the threading model of the runtime is respected. + TimerTask task = new TimerTask(unit.toMillis(delay)) { + @Override + public void run() { + String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; + CoordinatorWriteEvent<Void> event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { + log.debug("Executing write event {} for timer {}.", eventName, key); + + // If the task is different, it means that the timer has been + // cancelled while the event was waiting to be processed. + if (!tasks.remove(key, this)) { + throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); + } + + // Execute the timeout operation. + return new CoordinatorResult<>(operation.generateRecords(), null); + }); + + // If the write event fails, it is rescheduled with a small backoff except if the + // error is fatal. + event.future.exceptionally(ex -> { + if (ex instanceof RejectedExecutionException) { + log.debug("The delayed write event {} for the timer {} was not executed because it was " + Review Comment: Actually, no, we don’t because we expect the timer to be rescheduled before it expires and pushes the event to the queue. We would only see it when the timer expires and the event get pushed to the queue and the timer is rescheduled before the event is actually executed. This could for instance happen if the timeout event get right behind a heartbeat event in the queue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org