squah-confluent commented on code in PR #21350:
URL: https://github.com/apache/kafka/pull/21350#discussion_r2724027915
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -315,163 +312,6 @@ boolean canTransitionFrom(CoordinatorState state) {
abstract boolean canTransitionFrom(CoordinatorState state);
}
- /**
- * The EventBasedCoordinatorTimer implements the CoordinatorTimer
interface and provides an event based
- * timer which turns timeouts of a regular {@link Timer} into {@link
CoordinatorWriteEvent} events which
- * are executed by the {@link CoordinatorEventProcessor} used by this
coordinator runtime. This is done
- * to ensure that the timer respects the threading model of the
coordinator runtime.
- *
- * The {@link CoordinatorWriteEvent} events pushed by the coordinator
timer wraps the
- * {@link TimeoutOperation} operations scheduled by the coordinators.
- *
- * It also keeps track of all the scheduled {@link TimerTask}. This allows
timeout operations to be
- * cancelled or rescheduled. When a timer is cancelled or overridden, the
previous timer is guaranteed to
- * not be executed even if it already expired and got pushed to the event
processor.
- *
- * When a timer fails with an unexpected exception, the timer is
rescheduled with a backoff.
- */
- class EventBasedCoordinatorTimer implements CoordinatorTimer<Void, U> {
- /**
- * The logger.
- */
- final Logger log;
-
- /**
- * The topic partition.
- */
- final TopicPartition tp;
-
- /**
- * The scheduled timers keyed by their key.
- */
- final Map<String, TimerTask> tasks = new HashMap<>();
-
- EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) {
- this.tp = tp;
- this.log = logContext.logger(EventBasedCoordinatorTimer.class);
- }
-
- @Override
- public void schedule(
- String key,
- long delay,
- TimeUnit unit,
- boolean retry,
- TimeoutOperation<Void, U> operation
- ) {
- schedule(key, delay, unit, retry, 500, operation);
- }
-
- @Override
- public void schedule(
- String key,
- long delay,
- TimeUnit unit,
- boolean retry,
- long retryBackoff,
- TimeoutOperation<Void, U> operation
- ) {
- // The TimerTask wraps the TimeoutOperation into a
CoordinatorWriteEvent. When the TimerTask
- // expires, the event is pushed to the queue of the coordinator
runtime to be executed. This
- // ensures that the threading model of the runtime is respected.
- TimerTask task = new TimerTask(unit.toMillis(delay)) {
- @Override
- public void run() {
- String eventName = "Timeout(tp=" + tp + ", key=" + key +
")";
- CoordinatorWriteEvent<Void> event = new
CoordinatorWriteEvent<>(eventName, tp, writeTimeout, coordinator -> {
- log.debug("Executing write event {} for timer {}.",
eventName, key);
-
- // If the task is different, it means that the timer
has been
- // cancelled while the event was waiting to be
processed.
- if (!tasks.remove(key, this)) {
- throw new RejectedExecutionException("Timer " +
key + " was overridden or cancelled");
- }
-
- // Execute the timeout operation.
- return operation.generateRecords();
- });
-
- // If the write event fails, it is rescheduled with a
small backoff except if retry
- // is disabled or if the error is fatal.
- event.future.exceptionally(ex -> {
- if (ex instanceof RejectedExecutionException) {
- log.debug("The write event {} for the timer {} was
not executed because it was " +
- "cancelled or overridden.", event.name, key);
- return null;
- }
-
- if (ex instanceof NotCoordinatorException || ex
instanceof CoordinatorLoadInProgressException) {
- log.debug("The write event {} for the timer {}
failed due to {}. Ignoring it because " +
- "the coordinator is not active.", event.name,
key, ex.getMessage());
- return null;
- }
-
- if (retry) {
- log.info("The write event {} for the timer {}
failed due to {}. Rescheduling it. ",
- event.name, key, ex.getMessage());
- schedule(key, retryBackoff, TimeUnit.MILLISECONDS,
true, retryBackoff, operation);
- } else {
- log.error("The write event {} for the timer {}
failed due to {}. Ignoring it. ",
- event.name, key, ex.getMessage(), ex);
- }
-
- return null;
- });
-
- log.debug("Scheduling write event {} for timer {}.",
event.name, key);
- try {
- enqueueLast(event);
- } catch (NotCoordinatorException ex) {
Review Comment:
The exceptions that come out of `scheduleWriteOperation/enqueueLast` are
very confusing. I think it would help if we were a bit more thorough with the
`@throws`s in the javadocs, not necessarily in this PR.
~~I think the `RejectedExecutionException` handler within the
`exceptionally` above will never be hit because `scheduleWriteOperation` will
throw it directly?~~ seems that both the future can complete with
`RejectedExecutionException` and `scheduleWriteOperation` can throw it directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]