This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5f0e368255b MINOR: Decouple timer and executor from CoordinatorRuntime
(#21350)
5f0e368255b is described below
commit 5f0e368255b6c3638a7502ea787eed17ca69cf18
Author: David Jacot <[email protected]>
AuthorDate: Mon Jan 26 09:28:45 2026 +0100
MINOR: Decouple timer and executor from CoordinatorRuntime (#21350)
This patch refactors the coordinator runtime to improve modularity by
decoupling the timer and executor components:
- Extract EventBasedCoordinatorTimer from CoordinatorRuntime into
standalone CoordinatorTimerImpl;
- Introduce CoordinatorShardScheduler interface to allow shard-scoped
components (timer, executor) to schedule write operations back to the
runtime within their shard's scope;
- Simplify CoordinatorExecutorImpl by replacing direct runtime
dependency with the new scheduler interface;
Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../common/runtime/CoordinatorExecutorImpl.java | 23 +-
.../common/runtime/CoordinatorRuntime.java | 195 +------
.../common/runtime/CoordinatorShardScheduler.java | 51 ++
.../common/runtime/CoordinatorTimerImpl.java | 173 ++++++
.../runtime/CoordinatorExecutorImplTest.java | 100 ++--
.../common/runtime/CoordinatorTimerImplTest.java | 624 +++++++++++++++++++++
6 files changed, 925 insertions(+), 241 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
index 9e6ab571744..5300a8b1583 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.coordinator.common.runtime;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.utils.LogContext;
@@ -28,25 +27,22 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
-public class CoordinatorExecutorImpl<S extends CoordinatorShard<U>, U>
implements CoordinatorExecutor<U> {
+public class CoordinatorExecutorImpl<U> implements CoordinatorExecutor<U> {
private record TaskResult<R>(R result, Throwable exception) { }
private final Logger log;
- private final TopicPartition shard;
- private final CoordinatorRuntime<S, U> runtime;
private final ExecutorService executor;
+ private final CoordinatorShardScheduler<U> scheduler;
private final Map<String, TaskRunnable<?>> tasks = new
ConcurrentHashMap<>();
public CoordinatorExecutorImpl(
LogContext logContext,
- TopicPartition shard,
- CoordinatorRuntime<S, U> runtime,
- ExecutorService executor
+ ExecutorService executor,
+ CoordinatorShardScheduler<U> scheduler
) {
this.log = logContext.logger(CoordinatorExecutorImpl.class);
- this.shard = shard;
- this.runtime = runtime;
this.executor = executor;
+ this.scheduler = scheduler;
}
private <R> TaskResult<R> executeTask(TaskRunnable<R> task) {
@@ -73,13 +69,12 @@ public class CoordinatorExecutorImpl<S extends
CoordinatorShard<U>, U> implement
if (tasks.get(key) != task) return;
// Execute the task.
- final TaskResult<R> result = executeTask(task);
+ var result = executeTask(task);
// Schedule the operation.
- runtime.scheduleWriteOperation(
+ scheduler.scheduleWriteOperation(
key,
- shard,
- coordinator -> {
+ () -> {
// If the task associated with the key is not us, it means
// that the task was either replaced or cancelled. We stop.
if (!tasks.remove(key, task)) {
@@ -87,7 +82,7 @@ public class CoordinatorExecutorImpl<S extends
CoordinatorShard<U>, U> implement
}
// Call the underlying write operation with the result of
the task.
- return operation.onComplete(result.result,
result.exception);
+ return operation.onComplete(result.result(),
result.exception());
}
).exceptionally(exception -> {
// Remove the task after a failure.
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index d5b676ba9a7..9c8b6c319dd 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -50,8 +50,6 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -60,7 +58,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -315,163 +312,6 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
abstract boolean canTransitionFrom(CoordinatorState state);
}
- /**
- * The EventBasedCoordinatorTimer implements the CoordinatorTimer
interface and provides an event based
- * timer which turns timeouts of a regular {@link Timer} into {@link
CoordinatorWriteEvent} events which
- * are executed by the {@link CoordinatorEventProcessor} used by this
coordinator runtime. This is done
- * to ensure that the timer respects the threading model of the
coordinator runtime.
- *
- * The {@link CoordinatorWriteEvent} events pushed by the coordinator
timer wraps the
- * {@link TimeoutOperation} operations scheduled by the coordinators.
- *
- * It also keeps track of all the scheduled {@link TimerTask}. This allows
timeout operations to be
- * cancelled or rescheduled. When a timer is cancelled or overridden, the
previous timer is guaranteed to
- * not be executed even if it already expired and got pushed to the event
processor.
- *
- * When a timer fails with an unexpected exception, the timer is
rescheduled with a backoff.
- */
- class EventBasedCoordinatorTimer implements CoordinatorTimer<Void, U> {
- /**
- * The logger.
- */
- final Logger log;
-
- /**
- * The topic partition.
- */
- final TopicPartition tp;
-
- /**
- * The scheduled timers keyed by their key.
- */
- final Map<String, TimerTask> tasks = new HashMap<>();
-
- EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) {
- this.tp = tp;
- this.log = logContext.logger(EventBasedCoordinatorTimer.class);
- }
-
- @Override
- public void schedule(
- String key,
- long delay,
- TimeUnit unit,
- boolean retry,
- TimeoutOperation<Void, U> operation
- ) {
- schedule(key, delay, unit, retry, 500, operation);
- }
-
- @Override
- public void schedule(
- String key,
- long delay,
- TimeUnit unit,
- boolean retry,
- long retryBackoff,
- TimeoutOperation<Void, U> operation
- ) {
- // The TimerTask wraps the TimeoutOperation into a
CoordinatorWriteEvent. When the TimerTask
- // expires, the event is pushed to the queue of the coordinator
runtime to be executed. This
- // ensures that the threading model of the runtime is respected.
- TimerTask task = new TimerTask(unit.toMillis(delay)) {
- @Override
- public void run() {
- String eventName = "Timeout(tp=" + tp + ", key=" + key +
")";
- CoordinatorWriteEvent<Void> event = new
CoordinatorWriteEvent<>(eventName, tp, writeTimeout, coordinator -> {
- log.debug("Executing write event {} for timer {}.",
eventName, key);
-
- // If the task is different, it means that the timer
has been
- // cancelled while the event was waiting to be
processed.
- if (!tasks.remove(key, this)) {
- throw new RejectedExecutionException("Timer " +
key + " was overridden or cancelled");
- }
-
- // Execute the timeout operation.
- return operation.generateRecords();
- });
-
- // If the write event fails, it is rescheduled with a
small backoff except if retry
- // is disabled or if the error is fatal.
- event.future.exceptionally(ex -> {
- if (ex instanceof RejectedExecutionException) {
- log.debug("The write event {} for the timer {} was
not executed because it was " +
- "cancelled or overridden.", event.name, key);
- return null;
- }
-
- if (ex instanceof NotCoordinatorException || ex
instanceof CoordinatorLoadInProgressException) {
- log.debug("The write event {} for the timer {}
failed due to {}. Ignoring it because " +
- "the coordinator is not active.", event.name,
key, ex.getMessage());
- return null;
- }
-
- if (retry) {
- log.info("The write event {} for the timer {}
failed due to {}. Rescheduling it. ",
- event.name, key, ex.getMessage());
- schedule(key, retryBackoff, TimeUnit.MILLISECONDS,
true, retryBackoff, operation);
- } else {
- log.error("The write event {} for the timer {}
failed due to {}. Ignoring it. ",
- event.name, key, ex.getMessage(), ex);
- }
-
- return null;
- });
-
- log.debug("Scheduling write event {} for timer {}.",
event.name, key);
- try {
- enqueueLast(event);
- } catch (NotCoordinatorException ex) {
- log.info("Failed to enqueue write event {} for timer
{} because the runtime is closed. Ignoring it.",
- event.name, key);
- }
- }
- };
-
- log.debug("Registering timer {} with delay of {}ms.", key,
unit.toMillis(delay));
- TimerTask prevTask = tasks.put(key, task);
- if (prevTask != null) prevTask.cancel();
-
- timer.add(task);
- }
-
- @Override
- public void scheduleIfAbsent(
- String key,
- long delay,
- TimeUnit unit,
- boolean retry,
- TimeoutOperation<Void, U> operation
- ) {
- if (!tasks.containsKey(key)) {
- schedule(key, delay, unit, retry, 500, operation);
- }
- }
-
- @Override
- public void cancel(String key) {
- TimerTask prevTask = tasks.remove(key);
- if (prevTask != null) prevTask.cancel();
- }
-
- @Override
- public boolean isScheduled(String key) {
- return tasks.containsKey(key);
- }
-
- public void cancelAll() {
- Iterator<Map.Entry<String, TimerTask>> iterator =
tasks.entrySet().iterator();
- while (iterator.hasNext()) {
- iterator.next().getValue().cancel();
- iterator.remove();
- }
- }
-
- public int size() {
- return tasks.size();
- }
- }
-
/**
* A simple container class to hold all the attributes
* related to a pending batch.
@@ -570,12 +410,12 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
/**
* The coordinator timer.
*/
- final EventBasedCoordinatorTimer timer;
+ final CoordinatorTimerImpl<U> timer;
/**
* The coordinator executor.
*/
- final CoordinatorExecutorImpl<S, U> executor;
+ final CoordinatorExecutorImpl<U> executor;
/**
* The current state.
@@ -639,12 +479,35 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
this.state = CoordinatorState.INITIAL;
this.epoch = -1;
this.deferredEventQueue = new DeferredEventQueue(logContext);
- this.timer = new EventBasedCoordinatorTimer(tp, logContext);
+ this.timer = new CoordinatorTimerImpl<>(
+ logContext,
+ CoordinatorRuntime.this.timer,
+ (operationName, operation) -> {
+ try {
+ return scheduleWriteOperation(
+ operationName,
+ tp,
+ coordinator -> operation.generate()
+ );
+ } catch (Throwable t) {
+ return CompletableFuture.failedFuture(t);
+ }
+ }
+ );
this.executor = new CoordinatorExecutorImpl<>(
logContext,
- tp,
- CoordinatorRuntime.this,
- executorService
+ executorService,
+ (operationName, operation) -> {
+ try {
+ return scheduleWriteOperation(
+ operationName,
+ tp,
+ coordinator -> operation.generate()
+ );
+ } catch (Throwable t) {
+ return CompletableFuture.failedFuture(t);
+ }
+ }
);
this.bufferSupplier = new BufferSupplier.GrowableBufferSupplier();
this.cachedBufferSize = new AtomicLong(0);
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardScheduler.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardScheduler.java
new file mode 100644
index 00000000000..18fb1b86712
--- /dev/null
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardScheduler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Scheduler interface for shard-scoped components to schedule write operations
+ * through the coordinator runtime without depending on the full runtime API.
+ *
+ * @param <U> The record type used by the coordinator.
+ */
+@FunctionalInterface
+public interface CoordinatorShardScheduler<U> {
+
+ /**
+ * A write operation that produces records.
+ *
+ * @param <U> The record type used by the coordinator.
+ */
+ @FunctionalInterface
+ interface WriteOperation<U> {
+ CoordinatorResult<Void, U> generate();
+ }
+
+ /**
+ * Schedules a write operation to be executed by the runtime.
+ *
+ * @param operationName The name of the operation for logging/debugging.
+ * @param operation The write operation to execute.
+ * @return A future that completes when the operation is done.
+ */
+ CompletableFuture<Void> scheduleWriteOperation(
+ String operationName,
+ WriteOperation<U> operation
+ );
+}
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
new file mode 100644
index 00000000000..0c66c99c25f
--- /dev/null
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An event-based coordinator timer that bridges Kafka's utility Timer class
with the
+ * coordinator runtime's event-driven architecture. It converts timeout
expiries into
+ * write operations that are scheduled through the runtime, ensuring all
timeout operations
+ * respect the coordinator's threading model.
+ *
+ * When a timer fails with an unexpected exception, the timer is rescheduled
with a backoff.
+ */
+public class CoordinatorTimerImpl<U> implements CoordinatorTimer<Void, U> {
+ private final Logger log;
+ private final Timer timer;
+ private final CoordinatorShardScheduler<U> scheduler;
+ private final Map<String, TimerTask> tasks = new HashMap<>();
+
+ public CoordinatorTimerImpl(
+ LogContext logContext,
+ Timer timer,
+ CoordinatorShardScheduler<U> scheduler
+ ) {
+ this.log = logContext.logger(CoordinatorTimerImpl.class);
+ this.timer = timer;
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public void schedule(
+ String key,
+ long delay,
+ TimeUnit unit,
+ boolean retry,
+ TimeoutOperation<Void, U> operation
+ ) {
+ schedule(key, delay, unit, retry, 500, operation);
+ }
+
+ @Override
+ public void schedule(
+ String key,
+ long delay,
+ TimeUnit unit,
+ boolean retry,
+ long retryBackoff,
+ TimeoutOperation<Void, U> operation
+ ) {
+ // The TimerTask wraps the TimeoutOperation into a write operation.
When the TimerTask
+ // expires, the operation is scheduled through the scheduler to be
executed. This
+ // ensures that the threading model of the runtime is respected.
+ var task = new TimerTask(unit.toMillis(delay)) {
+ @Override
+ public void run() {
+ var operationName = "Timeout(key=" + key + ")";
+
+ scheduler.scheduleWriteOperation(
+ operationName,
+ () -> {
+ log.debug("Executing write event {} for timer {}.",
operationName, key);
+
+ // If the task is different, it means that the timer
has been
+ // cancelled while the event was waiting to be
processed.
+ if (!tasks.remove(key, this)) {
+ throw new RejectedExecutionException("Timer " +
key + " was overridden or cancelled");
+ }
+
+ // Execute the timeout operation.
+ return operation.generateRecords();
+ }
+ ).exceptionally(ex -> {
+ // Remove the task after a failure.
+ tasks.remove(key, this);
+
+ if (ex instanceof RejectedExecutionException) {
+ log.debug("The write event {} for the timer {} was not
executed because it was " +
+ "cancelled or overridden.", operationName, key);
+ return null;
+ }
+
+ if (ex instanceof NotCoordinatorException || ex instanceof
CoordinatorLoadInProgressException) {
+ log.debug("The write event {} for the timer {} failed
due to {}. Ignoring it because " +
+ "the coordinator is not active.", operationName,
key, ex.getMessage());
+ return null;
+ }
+
+ if (retry) {
+ log.info("The write event {} for the timer {} failed
due to {}. Rescheduling it. ",
+ operationName, key, ex.getMessage());
+ schedule(key, retryBackoff, TimeUnit.MILLISECONDS,
true, retryBackoff, operation);
+ } else {
+ log.error("The write event {} for the timer {} failed
due to {}. Ignoring it. ",
+ operationName, key, ex.getMessage(), ex);
+ }
+
+ return null;
+ });
+
+ log.debug("Scheduling write event {} for timer {}.",
operationName, key);
+ }
+ };
+
+ log.debug("Registering timer {} with delay of {}ms.", key,
unit.toMillis(delay));
+ var prevTask = tasks.put(key, task);
+ if (prevTask != null) prevTask.cancel();
+
+ timer.add(task);
+ }
+
+ @Override
+ public void scheduleIfAbsent(
+ String key,
+ long delay,
+ TimeUnit unit,
+ boolean retry,
+ TimeoutOperation<Void, U> operation
+ ) {
+ if (!tasks.containsKey(key)) {
+ schedule(key, delay, unit, retry, 500, operation);
+ }
+ }
+
+ @Override
+ public void cancel(String key) {
+ var prevTask = tasks.remove(key);
+ if (prevTask != null) prevTask.cancel();
+ }
+
+ @Override
+ public boolean isScheduled(String key) {
+ return tasks.containsKey(key);
+ }
+
+ public void cancelAll() {
+ Iterator<Map.Entry<String, TimerTask>> iterator =
tasks.entrySet().iterator();
+ while (iterator.hasNext()) {
+ iterator.next().getValue().cancel();
+ iterator.remove();
+ }
+ }
+
+ public int size() {
+ return tasks.size();
+ }
+}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
index 5a64a84d23a..c6e1e763798 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
@@ -16,13 +16,11 @@
*/
package org.apache.kafka.coordinator.common.runtime;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.util.FutureUtils;
import org.junit.jupiter.api.Test;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -47,33 +45,27 @@ import static org.mockito.Mockito.when;
@SuppressWarnings("unchecked")
public class CoordinatorExecutorImplTest {
private static final LogContext LOG_CONTEXT = new LogContext();
- private static final TopicPartition SHARD_PARTITION = new
TopicPartition("__consumer_offsets", 0);
- private static final Duration WRITE_TIMEOUT = Duration.ofMillis(1000);
private static final String TASK_KEY = "task";
@Test
public void testTaskSuccessfulLifecycle() {
- CoordinatorShard<String> coordinatorShard =
mock(CoordinatorShard.class);
- CoordinatorRuntime<CoordinatorShard<String>, String> runtime =
mock(CoordinatorRuntime.class);
+ CoordinatorShardScheduler<String> scheduler =
mock(CoordinatorShardScheduler.class);
ExecutorService executorService = mock(ExecutorService.class);
- CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor =
new CoordinatorExecutorImpl<>(
+ CoordinatorExecutorImpl<String> executor = new
CoordinatorExecutorImpl<>(
LOG_CONTEXT,
- SHARD_PARTITION,
- runtime,
- executorService
+ executorService,
+ scheduler
);
- when(runtime.scheduleWriteOperation(
+ when(scheduler.scheduleWriteOperation(
eq(TASK_KEY),
- eq(SHARD_PARTITION),
any()
)).thenAnswer(args -> {
assertTrue(executor.isScheduled(TASK_KEY));
-
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void,
String> op =
- args.getArgument(2);
+ CoordinatorShardScheduler.WriteOperation<String> op =
args.getArgument(1);
assertEquals(
new CoordinatorResult<>(List.of("record"), null),
- op.generateRecordsAndResult(coordinatorShard)
+ op.generate()
);
return CompletableFuture.completedFuture(null);
});
@@ -111,26 +103,22 @@ public class CoordinatorExecutorImplTest {
@Test
public void testTaskFailedLifecycle() {
- CoordinatorShard<String> coordinatorShard =
mock(CoordinatorShard.class);
- CoordinatorRuntime<CoordinatorShard<String>, String> runtime =
mock(CoordinatorRuntime.class);
+ CoordinatorShardScheduler<String> scheduler =
mock(CoordinatorShardScheduler.class);
ExecutorService executorService = mock(ExecutorService.class);
- CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor =
new CoordinatorExecutorImpl<>(
+ CoordinatorExecutorImpl<String> executor = new
CoordinatorExecutorImpl<>(
LOG_CONTEXT,
- SHARD_PARTITION,
- runtime,
- executorService
+ executorService,
+ scheduler
);
- when(runtime.scheduleWriteOperation(
+ when(scheduler.scheduleWriteOperation(
eq(TASK_KEY),
- eq(SHARD_PARTITION),
any()
)).thenAnswer(args -> {
-
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void,
String> op =
- args.getArgument(2);
+ CoordinatorShardScheduler.WriteOperation<String> op =
args.getArgument(1);
assertEquals(
new CoordinatorResult<>(List.of(), null),
- op.generateRecordsAndResult(coordinatorShard)
+ op.generate()
);
return CompletableFuture.completedFuture(null);
});
@@ -168,13 +156,12 @@ public class CoordinatorExecutorImplTest {
@Test
public void testTaskCancelledBeforeBeingExecuted() {
- CoordinatorRuntime<CoordinatorShard<String>, String> runtime =
mock(CoordinatorRuntime.class);
+ CoordinatorShardScheduler<String> scheduler =
mock(CoordinatorShardScheduler.class);
ExecutorService executorService = mock(ExecutorService.class);
- CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor =
new CoordinatorExecutorImpl<>(
+ CoordinatorExecutorImpl<String> executor = new
CoordinatorExecutorImpl<>(
LOG_CONTEXT,
- SHARD_PARTITION,
- runtime,
- executorService
+ executorService,
+ scheduler
);
when(executorService.submit(any(Runnable.class))).thenAnswer(args -> {
@@ -211,27 +198,23 @@ public class CoordinatorExecutorImplTest {
@Test
public void
testTaskCancelledAfterBeingExecutedButBeforeWriteOperationIsExecuted() {
- CoordinatorShard<String> coordinatorShard =
mock(CoordinatorShard.class);
- CoordinatorRuntime<CoordinatorShard<String>, String> runtime =
mock(CoordinatorRuntime.class);
+ CoordinatorShardScheduler<String> scheduler =
mock(CoordinatorShardScheduler.class);
ExecutorService executorService = mock(ExecutorService.class);
- CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor =
new CoordinatorExecutorImpl<>(
+ CoordinatorExecutorImpl<String> executor = new
CoordinatorExecutorImpl<>(
LOG_CONTEXT,
- SHARD_PARTITION,
- runtime,
- executorService
+ executorService,
+ scheduler
);
- when(runtime.scheduleWriteOperation(
+ when(scheduler.scheduleWriteOperation(
eq(TASK_KEY),
- eq(SHARD_PARTITION),
any()
)).thenAnswer(args -> {
// Cancel the task before running the write operation.
executor.cancel(TASK_KEY);
-
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void,
String> op =
- args.getArgument(2);
- Throwable ex = assertThrows(RejectedExecutionException.class, ()
-> op.generateRecordsAndResult(coordinatorShard));
+ CoordinatorShardScheduler.WriteOperation<String> op =
args.getArgument(1);
+ Throwable ex = assertThrows(RejectedExecutionException.class,
op::generate);
return FutureUtils.failedFuture(ex);
});
@@ -265,18 +248,16 @@ public class CoordinatorExecutorImplTest {
@Test
public void testTaskSchedulingWriteOperationFailed() {
- CoordinatorRuntime<CoordinatorShard<String>, String> runtime =
mock(CoordinatorRuntime.class);
+ CoordinatorShardScheduler<String> scheduler =
mock(CoordinatorShardScheduler.class);
ExecutorService executorService = mock(ExecutorService.class);
- CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor =
new CoordinatorExecutorImpl<>(
+ CoordinatorExecutorImpl<String> executor = new
CoordinatorExecutorImpl<>(
LOG_CONTEXT,
- SHARD_PARTITION,
- runtime,
- executorService
+ executorService,
+ scheduler
);
- when(runtime.scheduleWriteOperation(
+ when(scheduler.scheduleWriteOperation(
eq(TASK_KEY),
- eq(SHARD_PARTITION),
any()
)).thenReturn(FutureUtils.failedFuture(new Throwable("Oh no!")));
@@ -311,24 +292,21 @@ public class CoordinatorExecutorImplTest {
@Test
public void testCancelAllTasks() {
- CoordinatorShard<String> coordinatorShard =
mock(CoordinatorShard.class);
- CoordinatorRuntime<CoordinatorShard<String>, String> runtime =
mock(CoordinatorRuntime.class);
+ CoordinatorShardScheduler<String> scheduler =
mock(CoordinatorShardScheduler.class);
ExecutorService executorService = mock(ExecutorService.class);
- CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor =
new CoordinatorExecutorImpl<>(
+ CoordinatorExecutorImpl<String> executor = new
CoordinatorExecutorImpl<>(
LOG_CONTEXT,
- SHARD_PARTITION,
- runtime,
- executorService
+ executorService,
+ scheduler
);
-
List<CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>,
Void, String>> writeOperations = new ArrayList<>();
+ List<CoordinatorShardScheduler.WriteOperation<String>> writeOperations
= new ArrayList<>();
List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
- when(runtime.scheduleWriteOperation(
+ when(scheduler.scheduleWriteOperation(
anyString(),
- eq(SHARD_PARTITION),
any()
)).thenAnswer(args -> {
- writeOperations.add(args.getArgument(2));
+ writeOperations.add(args.getArgument(1));
CompletableFuture<Void> writeFuture = new CompletableFuture<>();
writeFutures.add(writeFuture);
return writeFuture;
@@ -363,9 +341,9 @@ public class CoordinatorExecutorImplTest {
executor.cancelAll();
for (int i = 0; i < writeOperations.size(); i++) {
-
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void,
String> writeOperation = writeOperations.get(i);
+ CoordinatorShardScheduler.WriteOperation<String> writeOperation =
writeOperations.get(i);
CompletableFuture<Void> writeFuture = writeFutures.get(i);
- Throwable ex = assertThrows(RejectedExecutionException.class, ()
-> writeOperation.generateRecordsAndResult(coordinatorShard));
+ Throwable ex = assertThrows(RejectedExecutionException.class,
writeOperation::generate);
writeFuture.completeExceptionally(ex);
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
new file mode 100644
index 00000000000..eb0058db43e
--- /dev/null
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.util.FutureUtils;
+import org.apache.kafka.server.util.timer.MockTimer;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("unchecked")
+public class CoordinatorTimerImplTest {
+ private static final LogContext LOG_CONTEXT = new LogContext();
+ private static final String TIMER_KEY = "timer-key";
+
+ @Test
+ public void testTimerSuccessfulLifecycle() throws InterruptedException {
+ var mockTimer = new MockTimer();
+ var operationCalled = new AtomicBoolean(false);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ var result = operation.generate();
+ assertEquals(new CoordinatorResult<>(List.of("record"), null),
result);
+ operationCalled.set(true);
+ return CompletableFuture.completedFuture(null);
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> new CoordinatorResult<>(List.of("record"), null)
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the timer.
+ mockTimer.advanceClock(100 + 1);
+
+ assertTrue(operationCalled.get());
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+ }
+
+ @Test
+ public void testTimerCancelledBeforeExpiry() throws InterruptedException {
+ var mockTimer = new MockTimer();
+ var operationCalled = new AtomicBoolean(false);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ operationCalled.set(true);
+ return CompletableFuture.completedFuture(null);
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> new CoordinatorResult<>(List.of("record"), null)
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Cancel before expiry.
+ timer.cancel(TIMER_KEY);
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+
+ // Advance time.
+ mockTimer.advanceClock(100 + 1);
+
+ // Operation should not be called.
+ assertFalse(operationCalled.get());
+ }
+
+ @Test
+ public void testTimerCancelledAfterExpiryButBeforeWriteOperation() throws
InterruptedException {
+ var mockTimer = new MockTimer();
+ var operationCalled = new AtomicBoolean(false);
+ var rejectedExceptionThrown = new AtomicBoolean(false);
+ var timerRef = new AtomicReference<CoordinatorTimerImpl<String>>();
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ // Cancel the timer BEFORE executing the write operation.
+ // This simulates the case where the timer is cancelled while the
write
+ // event is waiting to be processed.
+ timerRef.get().cancel(TIMER_KEY);
+
+ try {
+ operation.generate();
+ operationCalled.set(true);
+ } catch (RejectedExecutionException e) {
+ rejectedExceptionThrown.set(true);
+ return FutureUtils.failedFuture(e);
+ }
+ return CompletableFuture.completedFuture(null);
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+ timerRef.set(timer);
+
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> new CoordinatorResult<>(List.of("record"), null)
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the timer.
+ mockTimer.advanceClock(100 + 1);
+
+ // The operation should not have been called because we cancelled
+ // the timer before the write operation executed.
+ assertFalse(operationCalled.get());
+ assertTrue(rejectedExceptionThrown.get());
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+ }
+
+ @Test
+ public void testTimerOverridden() throws InterruptedException {
+ var mockTimer = new MockTimer();
+ var firstOperationCalled = new AtomicBoolean(false);
+ var secondOperationCalled = new AtomicBoolean(false);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ try {
+ operation.generate();
+ } catch (RejectedExecutionException e) {
+ // Expected for the overridden timer.
+ return FutureUtils.failedFuture(e);
+ }
+ return CompletableFuture.completedFuture(null);
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ // Schedule first timer.
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> {
+ firstOperationCalled.set(true);
+ return new CoordinatorResult<>(List.of("record1"), null);
+ }
+ );
+
+ // Override with second timer.
+ timer.schedule(
+ TIMER_KEY,
+ 200,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> {
+ secondOperationCalled.set(true);
+ return new CoordinatorResult<>(List.of("record2"), null);
+ }
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the second timer.
+ mockTimer.advanceClock(200 + 1);
+
+ assertFalse(firstOperationCalled.get());
+ assertTrue(secondOperationCalled.get());
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+ }
+
+ @Test
+ public void testTimerRetryOnFailure() throws InterruptedException {
+ var mockTimer = new MockTimer();
+ var callCount = new AtomicInteger(0);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ operation.generate();
+ var count = callCount.incrementAndGet();
+ if (count == 1) {
+ // Fail the first time.
+ return FutureUtils.failedFuture(new
RuntimeException("Simulated failure"));
+ }
+ return CompletableFuture.completedFuture(null);
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ true, // retry enabled
+ 50, // retry backoff
+ () -> new CoordinatorResult<>(List.of("record"), null)
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the first attempt.
+ mockTimer.advanceClock(100 + 1);
+
+ assertEquals(1, callCount.get());
+
+ // Advance time for retry backoff.
+ mockTimer.advanceClock(50 + 1);
+
+ assertEquals(2, callCount.get());
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+ }
+
+ @Test
+ public void testTimerNoRetryOnFailure() throws InterruptedException {
+ var mockTimer = new MockTimer();
+ var callCount = new AtomicInteger(0);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ operation.generate();
+ callCount.incrementAndGet();
+ return FutureUtils.failedFuture(new RuntimeException("Simulated
failure"));
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ false, // retry disabled
+ () -> new CoordinatorResult<>(List.of("record"), null)
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the timer.
+ mockTimer.advanceClock(100 + 1);
+
+ assertEquals(1, callCount.get());
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+
+ // Advance more time - should not retry.
+ mockTimer.advanceClock(500 + 1);
+
+ assertEquals(1, callCount.get());
+ }
+
+ @Test
+ public void testTimerIgnoredOnNotCoordinatorException() throws
InterruptedException {
+ var mockTimer = new MockTimer();
+ var callCount = new AtomicInteger(0);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ operation.generate();
+ callCount.incrementAndGet();
+ return FutureUtils.failedFuture(new NotCoordinatorException("Not
coordinator"));
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ true, // retry enabled, but should be ignored for
NotCoordinatorException
+ () -> new CoordinatorResult<>(List.of("record"), null)
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the timer.
+ mockTimer.advanceClock(100 + 1);
+
+ assertEquals(1, callCount.get());
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+
+ // Should not retry for NotCoordinatorException.
+ mockTimer.advanceClock(500 + 1);
+
+ assertEquals(1, callCount.get());
+ }
+
+ @Test
+ public void testTimerIgnoredOnCoordinatorLoadInProgressException() throws
InterruptedException {
+ var mockTimer = new MockTimer();
+ var callCount = new AtomicInteger(0);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ operation.generate();
+ callCount.incrementAndGet();
+ return FutureUtils.failedFuture(new
CoordinatorLoadInProgressException("Loading"));
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ true, // retry enabled, but should be ignored for
CoordinatorLoadInProgressException
+ () -> new CoordinatorResult<>(List.of("record"), null)
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the timer.
+ mockTimer.advanceClock(100 + 1);
+
+ assertEquals(1, callCount.get());
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+
+ // Should not retry for CoordinatorLoadInProgressException.
+ mockTimer.advanceClock(500 + 1);
+
+ assertEquals(1, callCount.get());
+ }
+
+ @Test
+ public void testScheduleIfAbsentWhenAbsent() throws InterruptedException {
+ var mockTimer = new MockTimer();
+ var operationCalled = new AtomicBoolean(false);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ operation.generate();
+ return CompletableFuture.completedFuture(null);
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ timer.scheduleIfAbsent(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> {
+ operationCalled.set(true);
+ return new CoordinatorResult<>(List.of("record"), null);
+ }
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the timer.
+ mockTimer.advanceClock(100 + 1);
+
+ assertTrue(operationCalled.get());
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+ }
+
+ @Test
+ public void testScheduleIfAbsentWhenPresent() throws InterruptedException {
+ var mockTimer = new MockTimer();
+ var firstOperationCalled = new AtomicBoolean(false);
+ var secondOperationCalled = new AtomicBoolean(false);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ operation.generate();
+ return CompletableFuture.completedFuture(null);
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ // Schedule first timer.
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> {
+ firstOperationCalled.set(true);
+ return new CoordinatorResult<>(List.of("record1"), null);
+ }
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Try to schedule second timer with scheduleIfAbsent - should be
ignored.
+ timer.scheduleIfAbsent(
+ TIMER_KEY,
+ 200,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> {
+ secondOperationCalled.set(true);
+ return new CoordinatorResult<>(List.of("record2"), null);
+ }
+ );
+
+ // Size should still be 1.
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the first timer.
+ mockTimer.advanceClock(100 + 1);
+
+ assertTrue(firstOperationCalled.get());
+ assertFalse(secondOperationCalled.get());
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+ }
+
+ @Test
+ public void testCancelAll() throws InterruptedException {
+ var mockTimer = new MockTimer();
+ var operationCallCount = new AtomicInteger(0);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ operation.generate();
+ operationCallCount.incrementAndGet();
+ return CompletableFuture.completedFuture(null);
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ // Schedule multiple timers.
+ for (int i = 0; i < 3; i++) {
+ timer.schedule(
+ TIMER_KEY + i,
+ 100,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> new CoordinatorResult<>(List.of("record"), null)
+ );
+ }
+
+ assertEquals(3, timer.size());
+
+ // Cancel all.
+ timer.cancelAll();
+
+ assertEquals(0, timer.size());
+ assertFalse(timer.isScheduled(TIMER_KEY + "0"));
+ assertFalse(timer.isScheduled(TIMER_KEY + "1"));
+ assertFalse(timer.isScheduled(TIMER_KEY + "2"));
+
+ // Advance time - no operations should be called.
+ mockTimer.advanceClock(100 + 1);
+
+ assertEquals(0, operationCallCount.get());
+ }
+
+ @Test
+ public void testDefaultRetryBackoff() throws InterruptedException {
+ var mockTimer = new MockTimer();
+ var callCount = new AtomicInteger(0);
+
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ operation.generate();
+ var count = callCount.incrementAndGet();
+ if (count == 1) {
+ return FutureUtils.failedFuture(new
RuntimeException("Simulated failure"));
+ }
+ return CompletableFuture.completedFuture(null);
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ // Use the schedule method without explicit retryBackoff (defaults to
500ms).
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ true, // retry enabled
+ () -> new CoordinatorResult<>(List.of("record"), null)
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the first attempt.
+ mockTimer.advanceClock(100 + 1);
+
+ assertEquals(1, callCount.get());
+
+ // Advance time for default retry backoff (500ms).
+ mockTimer.advanceClock(500 + 1);
+
+ assertEquals(2, callCount.get());
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+ }
+
+ @Test
+ public void testTaskCleanupOnFailedFutureWithoutOperationExecution()
throws InterruptedException {
+ var mockTimer = new MockTimer();
+ var operationCalled = new AtomicBoolean(false);
+
+ // Scheduler returns failed future WITHOUT calling
operation.generate().
+ // This simulates: (1) wrapped synchronous exceptions, or
+ // (2) events failing before being executed.
+ CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
+ // Don't call operation.generate() - simulates event never being
executed
+ return FutureUtils.failedFuture(new NotCoordinatorException("Not
coordinator"));
+ };
+
+ var timer = new CoordinatorTimerImpl<>(
+ LOG_CONTEXT,
+ mockTimer,
+ scheduler
+ );
+
+ timer.schedule(
+ TIMER_KEY,
+ 100,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> {
+ operationCalled.set(true);
+ return new CoordinatorResult<>(List.of("record"), null);
+ }
+ );
+
+ assertTrue(timer.isScheduled(TIMER_KEY));
+ assertEquals(1, timer.size());
+
+ // Advance time to trigger the timer.
+ mockTimer.advanceClock(100 + 1);
+
+ // Operation was never called.
+ assertFalse(operationCalled.get());
+ // But task should still be removed by exceptionally handler.
+ assertFalse(timer.isScheduled(TIMER_KEY));
+ assertEquals(0, timer.size());
+ }
+}