This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5f0e368255b MINOR: Decouple timer and executor from CoordinatorRuntime 
(#21350)
5f0e368255b is described below

commit 5f0e368255b6c3638a7502ea787eed17ca69cf18
Author: David Jacot <[email protected]>
AuthorDate: Mon Jan 26 09:28:45 2026 +0100

    MINOR: Decouple timer and executor from CoordinatorRuntime (#21350)
    
    This patch refactors the coordinator runtime to improve modularity by
    decoupling the timer and executor components:
    - Extract EventBasedCoordinatorTimer from CoordinatorRuntime into
    standalone CoordinatorTimerImpl;
    - Introduce CoordinatorShardScheduler interface to allow shard-scoped
    components (timer, executor) to schedule write operations back to the
    runtime within their shard's scope;
    - Simplify CoordinatorExecutorImpl by replacing direct runtime
    dependency with the new scheduler interface;
    
    Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../common/runtime/CoordinatorExecutorImpl.java    |  23 +-
 .../common/runtime/CoordinatorRuntime.java         | 195 +------
 .../common/runtime/CoordinatorShardScheduler.java  |  51 ++
 .../common/runtime/CoordinatorTimerImpl.java       | 173 ++++++
 .../runtime/CoordinatorExecutorImplTest.java       | 100 ++--
 .../common/runtime/CoordinatorTimerImplTest.java   | 624 +++++++++++++++++++++
 6 files changed, 925 insertions(+), 241 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
index 9e6ab571744..5300a8b1583 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.coordinator.common.runtime;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.utils.LogContext;
@@ -28,25 +27,22 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 
-public class CoordinatorExecutorImpl<S extends CoordinatorShard<U>, U> 
implements CoordinatorExecutor<U> {
+public class CoordinatorExecutorImpl<U> implements CoordinatorExecutor<U> {
     private record TaskResult<R>(R result, Throwable exception) { }
 
     private final Logger log;
-    private final TopicPartition shard;
-    private final CoordinatorRuntime<S, U> runtime;
     private final ExecutorService executor;
+    private final CoordinatorShardScheduler<U> scheduler;
     private final Map<String, TaskRunnable<?>> tasks = new 
ConcurrentHashMap<>();
 
     public CoordinatorExecutorImpl(
         LogContext logContext,
-        TopicPartition shard,
-        CoordinatorRuntime<S, U> runtime,
-        ExecutorService executor
+        ExecutorService executor,
+        CoordinatorShardScheduler<U> scheduler
     ) {
         this.log = logContext.logger(CoordinatorExecutorImpl.class);
-        this.shard = shard;
-        this.runtime = runtime;
         this.executor = executor;
+        this.scheduler = scheduler;
     }
 
     private <R> TaskResult<R> executeTask(TaskRunnable<R> task) {
@@ -73,13 +69,12 @@ public class CoordinatorExecutorImpl<S extends 
CoordinatorShard<U>, U> implement
             if (tasks.get(key) != task) return;
 
             // Execute the task.
-            final TaskResult<R> result = executeTask(task);
+            var result = executeTask(task);
 
             // Schedule the operation.
-            runtime.scheduleWriteOperation(
+            scheduler.scheduleWriteOperation(
                 key,
-                shard,
-                coordinator -> {
+                () -> {
                     // If the task associated with the key is not us, it means
                     // that the task was either replaced or cancelled. We stop.
                     if (!tasks.remove(key, task)) {
@@ -87,7 +82,7 @@ public class CoordinatorExecutorImpl<S extends 
CoordinatorShard<U>, U> implement
                     }
 
                     // Call the underlying write operation with the result of 
the task.
-                    return operation.onComplete(result.result, 
result.exception);
+                    return operation.onComplete(result.result(), 
result.exception());
                 }
             ).exceptionally(exception -> {
                 // Remove the task after a failure.
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index d5b676ba9a7..9c8b6c319dd 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -50,8 +50,6 @@ import org.slf4j.Logger;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -60,7 +58,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
@@ -315,163 +312,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         abstract boolean canTransitionFrom(CoordinatorState state);
     }
 
-    /**
-     * The EventBasedCoordinatorTimer implements the CoordinatorTimer 
interface and provides an event based
-     * timer which turns timeouts of a regular {@link Timer} into {@link 
CoordinatorWriteEvent} events which
-     * are executed by the {@link CoordinatorEventProcessor} used by this 
coordinator runtime. This is done
-     * to ensure that the timer respects the threading model of the 
coordinator runtime.
-     *
-     * The {@link CoordinatorWriteEvent} events pushed by the coordinator 
timer wraps the
-     * {@link TimeoutOperation} operations scheduled by the coordinators.
-     *
-     * It also keeps track of all the scheduled {@link TimerTask}. This allows 
timeout operations to be
-     * cancelled or rescheduled. When a timer is cancelled or overridden, the 
previous timer is guaranteed to
-     * not be executed even if it already expired and got pushed to the event 
processor.
-     *
-     * When a timer fails with an unexpected exception, the timer is 
rescheduled with a backoff.
-     */
-    class EventBasedCoordinatorTimer implements CoordinatorTimer<Void, U> {
-        /**
-         * The logger.
-         */
-        final Logger log;
-
-        /**
-         * The topic partition.
-         */
-        final TopicPartition tp;
-
-        /**
-         * The scheduled timers keyed by their key.
-         */
-        final Map<String, TimerTask> tasks = new HashMap<>();
-
-        EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) {
-            this.tp = tp;
-            this.log = logContext.logger(EventBasedCoordinatorTimer.class);
-        }
-
-        @Override
-        public void schedule(
-            String key,
-            long delay,
-            TimeUnit unit,
-            boolean retry,
-            TimeoutOperation<Void, U> operation
-        ) {
-            schedule(key, delay, unit, retry, 500, operation);
-        }
-
-        @Override
-        public void schedule(
-            String key,
-            long delay,
-            TimeUnit unit,
-            boolean retry,
-            long retryBackoff,
-            TimeoutOperation<Void, U> operation
-        ) {
-            // The TimerTask wraps the TimeoutOperation into a 
CoordinatorWriteEvent. When the TimerTask
-            // expires, the event is pushed to the queue of the coordinator 
runtime to be executed. This
-            // ensures that the threading model of the runtime is respected.
-            TimerTask task = new TimerTask(unit.toMillis(delay)) {
-                @Override
-                public void run() {
-                    String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
-                    CoordinatorWriteEvent<Void> event = new 
CoordinatorWriteEvent<>(eventName, tp, writeTimeout, coordinator -> {
-                        log.debug("Executing write event {} for timer {}.", 
eventName, key);
-
-                        // If the task is different, it means that the timer 
has been
-                        // cancelled while the event was waiting to be 
processed.
-                        if (!tasks.remove(key, this)) {
-                            throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
-                        }
-
-                        // Execute the timeout operation.
-                        return operation.generateRecords();
-                    });
-
-                    // If the write event fails, it is rescheduled with a 
small backoff except if retry
-                    // is disabled or if the error is fatal.
-                    event.future.exceptionally(ex -> {
-                        if (ex instanceof RejectedExecutionException) {
-                            log.debug("The write event {} for the timer {} was 
not executed because it was " +
-                                "cancelled or overridden.", event.name, key);
-                            return null;
-                        }
-
-                        if (ex instanceof NotCoordinatorException || ex 
instanceof CoordinatorLoadInProgressException) {
-                            log.debug("The write event {} for the timer {} 
failed due to {}. Ignoring it because " +
-                                "the coordinator is not active.", event.name, 
key, ex.getMessage());
-                            return null;
-                        }
-
-                        if (retry) {
-                            log.info("The write event {} for the timer {} 
failed due to {}. Rescheduling it. ",
-                                event.name, key, ex.getMessage());
-                            schedule(key, retryBackoff, TimeUnit.MILLISECONDS, 
true, retryBackoff, operation);
-                        } else {
-                            log.error("The write event {} for the timer {} 
failed due to {}. Ignoring it. ",
-                                event.name, key, ex.getMessage(), ex);
-                        }
-
-                        return null;
-                    });
-
-                    log.debug("Scheduling write event {} for timer {}.", 
event.name, key);
-                    try {
-                        enqueueLast(event);
-                    } catch (NotCoordinatorException ex) {
-                        log.info("Failed to enqueue write event {} for timer 
{} because the runtime is closed. Ignoring it.",
-                            event.name, key);
-                    }
-                }
-            };
-
-            log.debug("Registering timer {} with delay of {}ms.", key, 
unit.toMillis(delay));
-            TimerTask prevTask = tasks.put(key, task);
-            if (prevTask != null) prevTask.cancel();
-
-            timer.add(task);
-        }
-
-        @Override
-        public void scheduleIfAbsent(
-            String key,
-            long delay,
-            TimeUnit unit,
-            boolean retry,
-            TimeoutOperation<Void, U> operation
-        ) {
-            if (!tasks.containsKey(key)) {
-                schedule(key, delay, unit, retry, 500, operation);
-            }
-        }
-
-        @Override
-        public void cancel(String key) {
-            TimerTask prevTask = tasks.remove(key);
-            if (prevTask != null) prevTask.cancel();
-        }
-
-        @Override
-        public boolean isScheduled(String key) {
-            return tasks.containsKey(key);
-        }
-
-        public void cancelAll() {
-            Iterator<Map.Entry<String, TimerTask>> iterator = 
tasks.entrySet().iterator();
-            while (iterator.hasNext()) {
-                iterator.next().getValue().cancel();
-                iterator.remove();
-            }
-        }
-
-        public int size() {
-            return tasks.size();
-        }
-    }
-
     /**
      * A simple container class to hold all the attributes
      * related to a pending batch.
@@ -570,12 +410,12 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         /**
          * The coordinator timer.
          */
-        final EventBasedCoordinatorTimer timer;
+        final CoordinatorTimerImpl<U> timer;
 
         /**
          * The coordinator executor.
          */
-        final CoordinatorExecutorImpl<S, U> executor;
+        final CoordinatorExecutorImpl<U> executor;
 
         /**
          * The current state.
@@ -639,12 +479,35 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             this.state = CoordinatorState.INITIAL;
             this.epoch = -1;
             this.deferredEventQueue = new DeferredEventQueue(logContext);
-            this.timer = new EventBasedCoordinatorTimer(tp, logContext);
+            this.timer = new CoordinatorTimerImpl<>(
+                logContext,
+                CoordinatorRuntime.this.timer,
+                (operationName, operation) -> {
+                    try {
+                        return scheduleWriteOperation(
+                            operationName,
+                            tp,
+                            coordinator -> operation.generate()
+                        );
+                    } catch (Throwable t) {
+                        return CompletableFuture.failedFuture(t);
+                    }
+                }
+            );
             this.executor = new CoordinatorExecutorImpl<>(
                 logContext,
-                tp,
-                CoordinatorRuntime.this,
-                executorService
+                executorService,
+                (operationName, operation) -> {
+                    try {
+                        return scheduleWriteOperation(
+                            operationName,
+                            tp,
+                            coordinator -> operation.generate()
+                        );
+                    } catch (Throwable t) {
+                        return CompletableFuture.failedFuture(t);
+                    }
+                }
             );
             this.bufferSupplier = new BufferSupplier.GrowableBufferSupplier();
             this.cachedBufferSize = new AtomicLong(0);
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardScheduler.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardScheduler.java
new file mode 100644
index 00000000000..18fb1b86712
--- /dev/null
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardScheduler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Scheduler interface for shard-scoped components to schedule write operations
+ * through the coordinator runtime without depending on the full runtime API.
+ *
+ * @param <U> The record type used by the coordinator.
+ */
+@FunctionalInterface
+public interface CoordinatorShardScheduler<U> {
+
+    /**
+     * A write operation that produces records.
+     *
+     * @param <U> The record type used by the coordinator.
+     */
+    @FunctionalInterface
+    interface WriteOperation<U> {
+        CoordinatorResult<Void, U> generate();
+    }
+
+    /**
+     * Schedules a write operation to be executed by the runtime.
+     *
+     * @param operationName The name of the operation for logging/debugging.
+     * @param operation     The write operation to execute.
+     * @return A future that completes when the operation is done.
+     */
+    CompletableFuture<Void> scheduleWriteOperation(
+        String operationName,
+        WriteOperation<U> operation
+    );
+}
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
new file mode 100644
index 00000000000..0c66c99c25f
--- /dev/null
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An event-based coordinator timer that bridges Kafka's utility Timer class 
with the
+ * coordinator runtime's event-driven architecture. It converts timeout 
expiries into
+ * write operations that are scheduled through the runtime, ensuring all 
timeout operations
+ * respect the coordinator's threading model.
+ *
+ * When a timer fails with an unexpected exception, the timer is rescheduled 
with a backoff.
+ */
+public class CoordinatorTimerImpl<U> implements CoordinatorTimer<Void, U> {
+    private final Logger log;
+    private final Timer timer;
+    private final CoordinatorShardScheduler<U> scheduler;
+    private final Map<String, TimerTask> tasks = new HashMap<>();
+
+    public CoordinatorTimerImpl(
+        LogContext logContext,
+        Timer timer,
+        CoordinatorShardScheduler<U> scheduler
+    ) {
+        this.log = logContext.logger(CoordinatorTimerImpl.class);
+        this.timer = timer;
+        this.scheduler = scheduler;
+    }
+
+    @Override
+    public void schedule(
+        String key,
+        long delay,
+        TimeUnit unit,
+        boolean retry,
+        TimeoutOperation<Void, U> operation
+    ) {
+        schedule(key, delay, unit, retry, 500, operation);
+    }
+
+    @Override
+    public void schedule(
+        String key,
+        long delay,
+        TimeUnit unit,
+        boolean retry,
+        long retryBackoff,
+        TimeoutOperation<Void, U> operation
+    ) {
+        // The TimerTask wraps the TimeoutOperation into a write operation. 
When the TimerTask
+        // expires, the operation is scheduled through the scheduler to be 
executed. This
+        // ensures that the threading model of the runtime is respected.
+        var task = new TimerTask(unit.toMillis(delay)) {
+            @Override
+            public void run() {
+                var operationName = "Timeout(key=" + key + ")";
+
+                scheduler.scheduleWriteOperation(
+                    operationName,
+                    () -> {
+                        log.debug("Executing write event {} for timer {}.", 
operationName, key);
+
+                        // If the task is different, it means that the timer 
has been
+                        // cancelled while the event was waiting to be 
processed.
+                        if (!tasks.remove(key, this)) {
+                            throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+                        }
+
+                        // Execute the timeout operation.
+                        return operation.generateRecords();
+                    }
+                ).exceptionally(ex -> {
+                    // Remove the task after a failure.
+                    tasks.remove(key, this);
+
+                    if (ex instanceof RejectedExecutionException) {
+                        log.debug("The write event {} for the timer {} was not 
executed because it was " +
+                            "cancelled or overridden.", operationName, key);
+                        return null;
+                    }
+
+                    if (ex instanceof NotCoordinatorException || ex instanceof 
CoordinatorLoadInProgressException) {
+                        log.debug("The write event {} for the timer {} failed 
due to {}. Ignoring it because " +
+                            "the coordinator is not active.", operationName, 
key, ex.getMessage());
+                        return null;
+                    }
+
+                    if (retry) {
+                        log.info("The write event {} for the timer {} failed 
due to {}. Rescheduling it. ",
+                            operationName, key, ex.getMessage());
+                        schedule(key, retryBackoff, TimeUnit.MILLISECONDS, 
true, retryBackoff, operation);
+                    } else {
+                        log.error("The write event {} for the timer {} failed 
due to {}. Ignoring it. ",
+                            operationName, key, ex.getMessage(), ex);
+                    }
+
+                    return null;
+                });
+
+                log.debug("Scheduling write event {} for timer {}.", 
operationName, key);
+            }
+        };
+
+        log.debug("Registering timer {} with delay of {}ms.", key, 
unit.toMillis(delay));
+        var prevTask = tasks.put(key, task);
+        if (prevTask != null) prevTask.cancel();
+
+        timer.add(task);
+    }
+
+    @Override
+    public void scheduleIfAbsent(
+        String key,
+        long delay,
+        TimeUnit unit,
+        boolean retry,
+        TimeoutOperation<Void, U> operation
+    ) {
+        if (!tasks.containsKey(key)) {
+            schedule(key, delay, unit, retry, 500, operation);
+        }
+    }
+
+    @Override
+    public void cancel(String key) {
+        var prevTask = tasks.remove(key);
+        if (prevTask != null) prevTask.cancel();
+    }
+
+    @Override
+    public boolean isScheduled(String key) {
+        return tasks.containsKey(key);
+    }
+
+    public void cancelAll() {
+        Iterator<Map.Entry<String, TimerTask>> iterator = 
tasks.entrySet().iterator();
+        while (iterator.hasNext()) {
+            iterator.next().getValue().cancel();
+            iterator.remove();
+        }
+    }
+
+    public int size() {
+        return tasks.size();
+    }
+}
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
index 5a64a84d23a..c6e1e763798 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
@@ -16,13 +16,11 @@
  */
 package org.apache.kafka.coordinator.common.runtime;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.server.util.FutureUtils;
 
 import org.junit.jupiter.api.Test;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -47,33 +45,27 @@ import static org.mockito.Mockito.when;
 @SuppressWarnings("unchecked")
 public class CoordinatorExecutorImplTest {
     private static final LogContext LOG_CONTEXT = new LogContext();
-    private static final TopicPartition SHARD_PARTITION = new 
TopicPartition("__consumer_offsets", 0);
-    private static final Duration WRITE_TIMEOUT = Duration.ofMillis(1000);
     private static final String TASK_KEY = "task";
 
     @Test
     public void testTaskSuccessfulLifecycle() {
-        CoordinatorShard<String> coordinatorShard = 
mock(CoordinatorShard.class);
-        CoordinatorRuntime<CoordinatorShard<String>, String> runtime = 
mock(CoordinatorRuntime.class);
+        CoordinatorShardScheduler<String> scheduler = 
mock(CoordinatorShardScheduler.class);
         ExecutorService executorService = mock(ExecutorService.class);
-        CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor = 
new CoordinatorExecutorImpl<>(
+        CoordinatorExecutorImpl<String> executor = new 
CoordinatorExecutorImpl<>(
             LOG_CONTEXT,
-            SHARD_PARTITION,
-            runtime,
-            executorService
+            executorService,
+            scheduler
         );
 
-        when(runtime.scheduleWriteOperation(
+        when(scheduler.scheduleWriteOperation(
             eq(TASK_KEY),
-            eq(SHARD_PARTITION),
             any()
         )).thenAnswer(args -> {
             assertTrue(executor.isScheduled(TASK_KEY));
-            
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, 
String> op =
-                args.getArgument(2);
+            CoordinatorShardScheduler.WriteOperation<String> op = 
args.getArgument(1);
             assertEquals(
                 new CoordinatorResult<>(List.of("record"), null),
-                op.generateRecordsAndResult(coordinatorShard)
+                op.generate()
             );
             return CompletableFuture.completedFuture(null);
         });
@@ -111,26 +103,22 @@ public class CoordinatorExecutorImplTest {
 
     @Test
     public void testTaskFailedLifecycle() {
-        CoordinatorShard<String> coordinatorShard = 
mock(CoordinatorShard.class);
-        CoordinatorRuntime<CoordinatorShard<String>, String> runtime = 
mock(CoordinatorRuntime.class);
+        CoordinatorShardScheduler<String> scheduler = 
mock(CoordinatorShardScheduler.class);
         ExecutorService executorService = mock(ExecutorService.class);
-        CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor = 
new CoordinatorExecutorImpl<>(
+        CoordinatorExecutorImpl<String> executor = new 
CoordinatorExecutorImpl<>(
             LOG_CONTEXT,
-            SHARD_PARTITION,
-            runtime,
-            executorService
+            executorService,
+            scheduler
         );
 
-        when(runtime.scheduleWriteOperation(
+        when(scheduler.scheduleWriteOperation(
             eq(TASK_KEY),
-            eq(SHARD_PARTITION),
             any()
         )).thenAnswer(args -> {
-            
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, 
String> op =
-                args.getArgument(2);
+            CoordinatorShardScheduler.WriteOperation<String> op = 
args.getArgument(1);
             assertEquals(
                 new CoordinatorResult<>(List.of(), null),
-                op.generateRecordsAndResult(coordinatorShard)
+                op.generate()
             );
             return CompletableFuture.completedFuture(null);
         });
@@ -168,13 +156,12 @@ public class CoordinatorExecutorImplTest {
 
     @Test
     public void testTaskCancelledBeforeBeingExecuted() {
-        CoordinatorRuntime<CoordinatorShard<String>, String> runtime = 
mock(CoordinatorRuntime.class);
+        CoordinatorShardScheduler<String> scheduler = 
mock(CoordinatorShardScheduler.class);
         ExecutorService executorService = mock(ExecutorService.class);
-        CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor = 
new CoordinatorExecutorImpl<>(
+        CoordinatorExecutorImpl<String> executor = new 
CoordinatorExecutorImpl<>(
             LOG_CONTEXT,
-            SHARD_PARTITION,
-            runtime,
-            executorService
+            executorService,
+            scheduler
         );
 
         when(executorService.submit(any(Runnable.class))).thenAnswer(args -> {
@@ -211,27 +198,23 @@ public class CoordinatorExecutorImplTest {
 
     @Test
     public void 
testTaskCancelledAfterBeingExecutedButBeforeWriteOperationIsExecuted() {
-        CoordinatorShard<String> coordinatorShard = 
mock(CoordinatorShard.class);
-        CoordinatorRuntime<CoordinatorShard<String>, String> runtime = 
mock(CoordinatorRuntime.class);
+        CoordinatorShardScheduler<String> scheduler = 
mock(CoordinatorShardScheduler.class);
         ExecutorService executorService = mock(ExecutorService.class);
-        CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor = 
new CoordinatorExecutorImpl<>(
+        CoordinatorExecutorImpl<String> executor = new 
CoordinatorExecutorImpl<>(
             LOG_CONTEXT,
-            SHARD_PARTITION,
-            runtime,
-            executorService
+            executorService,
+            scheduler
         );
 
-        when(runtime.scheduleWriteOperation(
+        when(scheduler.scheduleWriteOperation(
             eq(TASK_KEY),
-            eq(SHARD_PARTITION),
             any()
         )).thenAnswer(args -> {
             // Cancel the task before running the write operation.
             executor.cancel(TASK_KEY);
 
-            
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, 
String> op =
-                args.getArgument(2);
-            Throwable ex = assertThrows(RejectedExecutionException.class, () 
-> op.generateRecordsAndResult(coordinatorShard));
+            CoordinatorShardScheduler.WriteOperation<String> op = 
args.getArgument(1);
+            Throwable ex = assertThrows(RejectedExecutionException.class, 
op::generate);
             return FutureUtils.failedFuture(ex);
         });
 
@@ -265,18 +248,16 @@ public class CoordinatorExecutorImplTest {
 
     @Test
     public void testTaskSchedulingWriteOperationFailed() {
-        CoordinatorRuntime<CoordinatorShard<String>, String> runtime = 
mock(CoordinatorRuntime.class);
+        CoordinatorShardScheduler<String> scheduler = 
mock(CoordinatorShardScheduler.class);
         ExecutorService executorService = mock(ExecutorService.class);
-        CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor = 
new CoordinatorExecutorImpl<>(
+        CoordinatorExecutorImpl<String> executor = new 
CoordinatorExecutorImpl<>(
             LOG_CONTEXT,
-            SHARD_PARTITION,
-            runtime,
-            executorService
+            executorService,
+            scheduler
         );
 
-        when(runtime.scheduleWriteOperation(
+        when(scheduler.scheduleWriteOperation(
             eq(TASK_KEY),
-            eq(SHARD_PARTITION),
             any()
         )).thenReturn(FutureUtils.failedFuture(new Throwable("Oh no!")));
 
@@ -311,24 +292,21 @@ public class CoordinatorExecutorImplTest {
 
     @Test
     public void testCancelAllTasks() {
-        CoordinatorShard<String> coordinatorShard = 
mock(CoordinatorShard.class);
-        CoordinatorRuntime<CoordinatorShard<String>, String> runtime = 
mock(CoordinatorRuntime.class);
+        CoordinatorShardScheduler<String> scheduler = 
mock(CoordinatorShardScheduler.class);
         ExecutorService executorService = mock(ExecutorService.class);
-        CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor = 
new CoordinatorExecutorImpl<>(
+        CoordinatorExecutorImpl<String> executor = new 
CoordinatorExecutorImpl<>(
             LOG_CONTEXT,
-            SHARD_PARTITION,
-            runtime,
-            executorService
+            executorService,
+            scheduler
         );
 
-        
List<CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, 
Void, String>> writeOperations = new ArrayList<>();
+        List<CoordinatorShardScheduler.WriteOperation<String>> writeOperations 
= new ArrayList<>();
         List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
-        when(runtime.scheduleWriteOperation(
+        when(scheduler.scheduleWriteOperation(
             anyString(),
-            eq(SHARD_PARTITION),
             any()
         )).thenAnswer(args -> {
-            writeOperations.add(args.getArgument(2));
+            writeOperations.add(args.getArgument(1));
             CompletableFuture<Void> writeFuture = new CompletableFuture<>();
             writeFutures.add(writeFuture);
             return writeFuture;
@@ -363,9 +341,9 @@ public class CoordinatorExecutorImplTest {
         executor.cancelAll();
 
         for (int i = 0; i < writeOperations.size(); i++) {
-            
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, 
String> writeOperation = writeOperations.get(i);
+            CoordinatorShardScheduler.WriteOperation<String> writeOperation = 
writeOperations.get(i);
             CompletableFuture<Void> writeFuture = writeFutures.get(i);
-            Throwable ex = assertThrows(RejectedExecutionException.class, () 
-> writeOperation.generateRecordsAndResult(coordinatorShard));
+            Throwable ex = assertThrows(RejectedExecutionException.class, 
writeOperation::generate);
             writeFuture.completeExceptionally(ex);
         }
 
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
new file mode 100644
index 00000000000..eb0058db43e
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.util.FutureUtils;
+import org.apache.kafka.server.util.timer.MockTimer;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("unchecked")
+public class CoordinatorTimerImplTest {
+    private static final LogContext LOG_CONTEXT = new LogContext();
+    private static final String TIMER_KEY = "timer-key";
+
+    @Test
+    public void testTimerSuccessfulLifecycle() throws InterruptedException {
+        var mockTimer = new MockTimer();
+        var operationCalled = new AtomicBoolean(false);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            var result = operation.generate();
+            assertEquals(new CoordinatorResult<>(List.of("record"), null), 
result);
+            operationCalled.set(true);
+            return CompletableFuture.completedFuture(null);
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            false,
+            () -> new CoordinatorResult<>(List.of("record"), null)
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the timer.
+        mockTimer.advanceClock(100 + 1);
+
+        assertTrue(operationCalled.get());
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+    }
+
+    @Test
+    public void testTimerCancelledBeforeExpiry() throws InterruptedException {
+        var mockTimer = new MockTimer();
+        var operationCalled = new AtomicBoolean(false);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            operationCalled.set(true);
+            return CompletableFuture.completedFuture(null);
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            false,
+            () -> new CoordinatorResult<>(List.of("record"), null)
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Cancel before expiry.
+        timer.cancel(TIMER_KEY);
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+
+        // Advance time.
+        mockTimer.advanceClock(100 + 1);
+
+        // Operation should not be called.
+        assertFalse(operationCalled.get());
+    }
+
+    @Test
+    public void testTimerCancelledAfterExpiryButBeforeWriteOperation() throws 
InterruptedException {
+        var mockTimer = new MockTimer();
+        var operationCalled = new AtomicBoolean(false);
+        var rejectedExceptionThrown = new AtomicBoolean(false);
+        var timerRef = new AtomicReference<CoordinatorTimerImpl<String>>();
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            // Cancel the timer BEFORE executing the write operation.
+            // This simulates the case where the timer is cancelled while the 
write
+            // event is waiting to be processed.
+            timerRef.get().cancel(TIMER_KEY);
+
+            try {
+                operation.generate();
+                operationCalled.set(true);
+            } catch (RejectedExecutionException e) {
+                rejectedExceptionThrown.set(true);
+                return FutureUtils.failedFuture(e);
+            }
+            return CompletableFuture.completedFuture(null);
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+        timerRef.set(timer);
+
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            false,
+            () -> new CoordinatorResult<>(List.of("record"), null)
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the timer.
+        mockTimer.advanceClock(100 + 1);
+
+        // The operation should not have been called because we cancelled
+        // the timer before the write operation executed.
+        assertFalse(operationCalled.get());
+        assertTrue(rejectedExceptionThrown.get());
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+    }
+
+    @Test
+    public void testTimerOverridden() throws InterruptedException {
+        var mockTimer = new MockTimer();
+        var firstOperationCalled = new AtomicBoolean(false);
+        var secondOperationCalled = new AtomicBoolean(false);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            try {
+                operation.generate();
+            } catch (RejectedExecutionException e) {
+                // Expected for the overridden timer.
+                return FutureUtils.failedFuture(e);
+            }
+            return CompletableFuture.completedFuture(null);
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        // Schedule first timer.
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            false,
+            () -> {
+                firstOperationCalled.set(true);
+                return new CoordinatorResult<>(List.of("record1"), null);
+            }
+        );
+
+        // Override with second timer.
+        timer.schedule(
+            TIMER_KEY,
+            200,
+            TimeUnit.MILLISECONDS,
+            false,
+            () -> {
+                secondOperationCalled.set(true);
+                return new CoordinatorResult<>(List.of("record2"), null);
+            }
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the second timer.
+        mockTimer.advanceClock(200 + 1);
+
+        assertFalse(firstOperationCalled.get());
+        assertTrue(secondOperationCalled.get());
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+    }
+
+    @Test
+    public void testTimerRetryOnFailure() throws InterruptedException {
+        var mockTimer = new MockTimer();
+        var callCount = new AtomicInteger(0);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            operation.generate();
+            var count = callCount.incrementAndGet();
+            if (count == 1) {
+                // Fail the first time.
+                return FutureUtils.failedFuture(new 
RuntimeException("Simulated failure"));
+            }
+            return CompletableFuture.completedFuture(null);
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            true, // retry enabled
+            50,   // retry backoff
+            () -> new CoordinatorResult<>(List.of("record"), null)
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the first attempt.
+        mockTimer.advanceClock(100 + 1);
+
+        assertEquals(1, callCount.get());
+
+        // Advance time for retry backoff.
+        mockTimer.advanceClock(50 + 1);
+
+        assertEquals(2, callCount.get());
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+    }
+
+    @Test
+    public void testTimerNoRetryOnFailure() throws InterruptedException {
+        var mockTimer = new MockTimer();
+        var callCount = new AtomicInteger(0);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            operation.generate();
+            callCount.incrementAndGet();
+            return FutureUtils.failedFuture(new RuntimeException("Simulated 
failure"));
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            false, // retry disabled
+            () -> new CoordinatorResult<>(List.of("record"), null)
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the timer.
+        mockTimer.advanceClock(100 + 1);
+
+        assertEquals(1, callCount.get());
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+
+        // Advance more time - should not retry.
+        mockTimer.advanceClock(500 + 1);
+
+        assertEquals(1, callCount.get());
+    }
+
+    @Test
+    public void testTimerIgnoredOnNotCoordinatorException() throws 
InterruptedException {
+        var mockTimer = new MockTimer();
+        var callCount = new AtomicInteger(0);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            operation.generate();
+            callCount.incrementAndGet();
+            return FutureUtils.failedFuture(new NotCoordinatorException("Not 
coordinator"));
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            true, // retry enabled, but should be ignored for 
NotCoordinatorException
+            () -> new CoordinatorResult<>(List.of("record"), null)
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the timer.
+        mockTimer.advanceClock(100 + 1);
+
+        assertEquals(1, callCount.get());
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+
+        // Should not retry for NotCoordinatorException.
+        mockTimer.advanceClock(500 + 1);
+
+        assertEquals(1, callCount.get());
+    }
+
+    @Test
+    public void testTimerIgnoredOnCoordinatorLoadInProgressException() throws 
InterruptedException {
+        var mockTimer = new MockTimer();
+        var callCount = new AtomicInteger(0);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            operation.generate();
+            callCount.incrementAndGet();
+            return FutureUtils.failedFuture(new 
CoordinatorLoadInProgressException("Loading"));
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            true, // retry enabled, but should be ignored for 
CoordinatorLoadInProgressException
+            () -> new CoordinatorResult<>(List.of("record"), null)
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the timer.
+        mockTimer.advanceClock(100 + 1);
+
+        assertEquals(1, callCount.get());
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+
+        // Should not retry for CoordinatorLoadInProgressException.
+        mockTimer.advanceClock(500 + 1);
+
+        assertEquals(1, callCount.get());
+    }
+
+    @Test
+    public void testScheduleIfAbsentWhenAbsent() throws InterruptedException {
+        var mockTimer = new MockTimer();
+        var operationCalled = new AtomicBoolean(false);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            operation.generate();
+            return CompletableFuture.completedFuture(null);
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        timer.scheduleIfAbsent(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            false,
+            () -> {
+                operationCalled.set(true);
+                return new CoordinatorResult<>(List.of("record"), null);
+            }
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the timer.
+        mockTimer.advanceClock(100 + 1);
+
+        assertTrue(operationCalled.get());
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+    }
+
+    @Test
+    public void testScheduleIfAbsentWhenPresent() throws InterruptedException {
+        var mockTimer = new MockTimer();
+        var firstOperationCalled = new AtomicBoolean(false);
+        var secondOperationCalled = new AtomicBoolean(false);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            operation.generate();
+            return CompletableFuture.completedFuture(null);
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        // Schedule first timer.
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            false,
+            () -> {
+                firstOperationCalled.set(true);
+                return new CoordinatorResult<>(List.of("record1"), null);
+            }
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Try to schedule second timer with scheduleIfAbsent - should be 
ignored.
+        timer.scheduleIfAbsent(
+            TIMER_KEY,
+            200,
+            TimeUnit.MILLISECONDS,
+            false,
+            () -> {
+                secondOperationCalled.set(true);
+                return new CoordinatorResult<>(List.of("record2"), null);
+            }
+        );
+
+        // Size should still be 1.
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the first timer.
+        mockTimer.advanceClock(100 + 1);
+
+        assertTrue(firstOperationCalled.get());
+        assertFalse(secondOperationCalled.get());
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+    }
+
+    @Test
+    public void testCancelAll() throws InterruptedException {
+        var mockTimer = new MockTimer();
+        var operationCallCount = new AtomicInteger(0);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            operation.generate();
+            operationCallCount.incrementAndGet();
+            return CompletableFuture.completedFuture(null);
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        // Schedule multiple timers.
+        for (int i = 0; i < 3; i++) {
+            timer.schedule(
+                TIMER_KEY + i,
+                100,
+                TimeUnit.MILLISECONDS,
+                false,
+                () -> new CoordinatorResult<>(List.of("record"), null)
+            );
+        }
+
+        assertEquals(3, timer.size());
+
+        // Cancel all.
+        timer.cancelAll();
+
+        assertEquals(0, timer.size());
+        assertFalse(timer.isScheduled(TIMER_KEY + "0"));
+        assertFalse(timer.isScheduled(TIMER_KEY + "1"));
+        assertFalse(timer.isScheduled(TIMER_KEY + "2"));
+
+        // Advance time - no operations should be called.
+        mockTimer.advanceClock(100 + 1);
+
+        assertEquals(0, operationCallCount.get());
+    }
+
+    @Test
+    public void testDefaultRetryBackoff() throws InterruptedException {
+        var mockTimer = new MockTimer();
+        var callCount = new AtomicInteger(0);
+
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            operation.generate();
+            var count = callCount.incrementAndGet();
+            if (count == 1) {
+                return FutureUtils.failedFuture(new 
RuntimeException("Simulated failure"));
+            }
+            return CompletableFuture.completedFuture(null);
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        // Use the schedule method without explicit retryBackoff (defaults to 
500ms).
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            true, // retry enabled
+            () -> new CoordinatorResult<>(List.of("record"), null)
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the first attempt.
+        mockTimer.advanceClock(100 + 1);
+
+        assertEquals(1, callCount.get());
+
+        // Advance time for default retry backoff (500ms).
+        mockTimer.advanceClock(500 + 1);
+
+        assertEquals(2, callCount.get());
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+    }
+
+    @Test
+    public void testTaskCleanupOnFailedFutureWithoutOperationExecution() 
throws InterruptedException {
+        var mockTimer = new MockTimer();
+        var operationCalled = new AtomicBoolean(false);
+
+        // Scheduler returns failed future WITHOUT calling 
operation.generate().
+        // This simulates: (1) wrapped synchronous exceptions, or
+        // (2) events failing before being executed.
+        CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
+            // Don't call operation.generate() - simulates event never being 
executed
+            return FutureUtils.failedFuture(new NotCoordinatorException("Not 
coordinator"));
+        };
+
+        var timer = new CoordinatorTimerImpl<>(
+            LOG_CONTEXT,
+            mockTimer,
+            scheduler
+        );
+
+        timer.schedule(
+            TIMER_KEY,
+            100,
+            TimeUnit.MILLISECONDS,
+            false,
+            () -> {
+                operationCalled.set(true);
+                return new CoordinatorResult<>(List.of("record"), null);
+            }
+        );
+
+        assertTrue(timer.isScheduled(TIMER_KEY));
+        assertEquals(1, timer.size());
+
+        // Advance time to trigger the timer.
+        mockTimer.advanceClock(100 + 1);
+
+        // Operation was never called.
+        assertFalse(operationCalled.get());
+        // But task should still be removed by exceptionally handler.
+        assertFalse(timer.isScheduled(TIMER_KEY));
+        assertEquals(0, timer.size());
+    }
+}

Reply via email to