This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ec12d360a1a MINOR: Move inner test classes out of
CoordinatorRuntimeTest (#19258)
ec12d360a1a is described below
commit ec12d360a1ad21229a125153f0982d68fed06aa8
Author: Sean Quah <[email protected]>
AuthorDate: Fri Mar 21 15:36:23 2025 +0000
MINOR: Move inner test classes out of CoordinatorRuntimeTest (#19258)
Some of these classes are generally useful for testing.
MockCoordinatorShard is already shared by SnapshottableCoordinatorTest.
Also do some minor refactors.
Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>, David Jacot <[email protected]>
---
.../common/runtime/CoordinatorRuntimeTest.java | 445 +--------------------
.../common/runtime/DirectEventProcessor.java | 46 +++
.../common/runtime/ManualEventProcessor.java | 59 +++
.../common/runtime/MockCoordinatorLoader.java | 58 +++
.../common/runtime/MockCoordinatorShard.java | 140 +++++++
.../runtime/MockCoordinatorShardBuilder.java | 92 +++++
.../MockCoordinatorShardBuilderSupplier.java | 27 ++
.../common/runtime/MockPartitionWriter.java | 89 +++++
.../runtime/SnapshottableCoordinatorTest.java | 1 -
.../common/runtime/StringSerializer.java | 34 ++
.../common/runtime/ThrowingSerializer.java | 50 +++
11 files changed, 598 insertions(+), 443 deletions(-)
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 72f31b0a7d8..fb5b4a68572 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@@ -46,8 +45,6 @@ import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.timeline.SnapshotRegistry;
-import org.apache.kafka.timeline.TimelineHashMap;
-import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -60,18 +57,13 @@ import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -111,437 +103,6 @@ public class CoordinatorRuntimeTest {
private static final short TXN_OFFSET_COMMIT_LATEST_VERSION =
ApiKeys.TXN_OFFSET_COMMIT.latestVersion();
- private static class StringSerializer implements Serializer<String> {
- @Override
- public byte[] serializeKey(String record) {
- return null;
- }
-
- @Override
- public byte[] serializeValue(String record) {
- return record.getBytes(Charset.defaultCharset());
- }
- }
-
- private static class ThrowingSerializer<T> implements Serializer<T> {
- private final Serializer<T> serializer;
- private boolean throwOnNextOperation;
-
- public ThrowingSerializer(Serializer<T> serializer) {
- this.serializer = serializer;
- this.throwOnNextOperation = false;
- }
-
- public void throwOnNextOperation() {
- throwOnNextOperation = true;
- }
-
- @Override
- public byte[] serializeKey(T record) {
- return serializer.serializeKey(record);
- }
-
- @Override
- public byte[] serializeValue(T record) {
- if (throwOnNextOperation) {
- throwOnNextOperation = false;
- throw new BufferOverflowException();
- }
- return serializer.serializeValue(record);
- }
- }
-
- /**
- * A CoordinatorEventProcessor that directly executes the operations. This
is
- * useful in unit tests where execution in threads is not required.
- */
- private static class DirectEventProcessor implements
CoordinatorEventProcessor {
- @Override
- public void enqueueLast(CoordinatorEvent event) throws
RejectedExecutionException {
- try {
- event.run();
- } catch (Throwable ex) {
- event.complete(ex);
- }
- }
-
- @Override
- public void enqueueFirst(CoordinatorEvent event) throws
RejectedExecutionException {
- try {
- event.run();
- } catch (Throwable ex) {
- event.complete(ex);
- }
- }
-
- @Override
- public void close() {}
- }
-
- /**
- * A CoordinatorEventProcessor that queues event and execute the next one
- * when poll() is called.
- */
- private static class ManualEventProcessor implements
CoordinatorEventProcessor {
- private final Deque<CoordinatorEvent> queue = new LinkedList<>();
-
- @Override
- public void enqueueLast(CoordinatorEvent event) throws
RejectedExecutionException {
- queue.addLast(event);
- }
-
- @Override
- public void enqueueFirst(CoordinatorEvent event) throws
RejectedExecutionException {
- queue.addFirst(event);
- }
-
- public boolean poll() {
- CoordinatorEvent event = queue.poll();
- if (event == null) return false;
-
- try {
- event.run();
- } catch (Throwable ex) {
- event.complete(ex);
- }
-
- return true;
- }
-
- public int size() {
- return queue.size();
- }
-
- @Override
- public void close() {
-
- }
- }
-
- /**
- * A CoordinatorLoader that always succeeds.
- */
- private static class MockCoordinatorLoader implements
CoordinatorLoader<String> {
- private final LoadSummary summary;
- private final List<Long> lastWrittenOffsets;
- private final List<Long> lastCommittedOffsets;
-
- public MockCoordinatorLoader(
- LoadSummary summary,
- List<Long> lastWrittenOffsets,
- List<Long> lastCommittedOffsets
- ) {
- this.summary = summary;
- this.lastWrittenOffsets = lastWrittenOffsets;
- this.lastCommittedOffsets = lastCommittedOffsets;
- }
-
- public MockCoordinatorLoader() {
- this(null, List.of(), List.of());
- }
-
- @Override
- public CompletableFuture<LoadSummary> load(
- TopicPartition tp,
- CoordinatorPlayback<String> replayable
- ) {
- lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset);
-
lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset);
- return CompletableFuture.completedFuture(summary);
- }
-
- @Override
- public void close() { }
- }
-
- /**
- * An in-memory partition writer that accepts a maximum number of writes.
- */
- private static class MockPartitionWriter extends InMemoryPartitionWriter {
- private final Time time;
- private final int maxWrites;
- private final boolean failEndMarker;
- private final AtomicInteger writeCount = new AtomicInteger(0);
-
- public MockPartitionWriter() {
- this(new MockTime(), Integer.MAX_VALUE, false);
- }
-
- public MockPartitionWriter(int maxWrites) {
- this(new MockTime(), maxWrites, false);
- }
-
- public MockPartitionWriter(boolean failEndMarker) {
- this(new MockTime(), Integer.MAX_VALUE, failEndMarker);
- }
-
- public MockPartitionWriter(Time time, int maxWrites, boolean
failEndMarker) {
- super(false);
- this.time = time;
- this.maxWrites = maxWrites;
- this.failEndMarker = failEndMarker;
- }
-
- @Override
- public void registerListener(TopicPartition tp, Listener listener) {
- super.registerListener(tp, listener);
- }
-
- @Override
- public void deregisterListener(TopicPartition tp, Listener listener) {
- super.deregisterListener(tp, listener);
- }
-
- @Override
- public long append(
- TopicPartition tp,
- VerificationGuard verificationGuard,
- MemoryRecords batch
- ) {
- if (batch.sizeInBytes() > config(tp).maxMessageSize())
- throw new RecordTooLargeException("Batch is larger than the
max message size");
-
- // We don't want the coordinator to write empty batches.
- if (batch.validBytes() <= 0)
- throw new KafkaException("Coordinator tried to write an empty
batch");
-
- if (writeCount.incrementAndGet() > maxWrites)
- throw new KafkaException("Maximum number of writes reached");
-
- if (failEndMarker && batch.firstBatch().isControlBatch())
- throw new KafkaException("Couldn't write end marker.");
-
- time.sleep(10);
- return super.append(tp, verificationGuard, batch);
- }
- }
-
- /**
- * A simple Coordinator implementation that stores the records into a set.
- */
- static class MockCoordinatorShard implements CoordinatorShard<String> {
- static class RecordAndMetadata {
- public final long offset;
- public final long producerId;
- public final short producerEpoch;
- public final String record;
-
- public RecordAndMetadata(
- long offset,
- String record
- ) {
- this(
- offset,
- RecordBatch.NO_PRODUCER_ID,
- RecordBatch.NO_PRODUCER_EPOCH,
- record
- );
- }
-
- public RecordAndMetadata(
- long offset,
- long producerId,
- short producerEpoch,
- String record
- ) {
- this.offset = offset;
- this.producerId = producerId;
- this.producerEpoch = producerEpoch;
- this.record = record;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- RecordAndMetadata that = (RecordAndMetadata) o;
-
- if (offset != that.offset) return false;
- if (producerId != that.producerId) return false;
- if (producerEpoch != that.producerEpoch) return false;
- return Objects.equals(record, that.record);
- }
-
- @Override
- public int hashCode() {
- int result = (int) (offset ^ (offset >>> 32));
- result = 31 * result + (int) (producerId ^ (producerId >>>
32));
- result = 31 * result + (int) producerEpoch;
- result = 31 * result + (record != null ? record.hashCode() :
0);
- return result;
- }
-
- @Override
- public String toString() {
- return "RecordAndMetadata(" +
- "offset=" + offset +
- ", producerId=" + producerId +
- ", producerEpoch=" + producerEpoch +
- ", record='" + record.substring(0, Math.min(10,
record.length())) + '\'' +
- ')';
- }
- }
-
- private final SnapshotRegistry snapshotRegistry;
- private final TimelineHashSet<RecordAndMetadata> records;
- private final TimelineHashMap<Long,
TimelineHashSet<RecordAndMetadata>> pendingRecords;
- private final CoordinatorTimer<Void, String> timer;
- private final CoordinatorExecutor<String> executor;
-
- MockCoordinatorShard(
- SnapshotRegistry snapshotRegistry,
- CoordinatorTimer<Void, String> timer
- ) {
- this(snapshotRegistry, timer, null);
- }
-
- MockCoordinatorShard(
- SnapshotRegistry snapshotRegistry,
- CoordinatorTimer<Void, String> timer,
- CoordinatorExecutor<String> executor
- ) {
- this.snapshotRegistry = snapshotRegistry;
- this.records = new TimelineHashSet<>(snapshotRegistry, 0);
- this.pendingRecords = new TimelineHashMap<>(snapshotRegistry, 0);
- this.timer = timer;
- this.executor = executor;
- }
-
- @Override
- public void replay(
- long offset,
- long producerId,
- short producerEpoch,
- String record
- ) throws RuntimeException {
- RecordAndMetadata recordAndMetadata = new RecordAndMetadata(
- offset,
- producerId,
- producerEpoch,
- record
- );
-
- if (producerId == RecordBatch.NO_PRODUCER_ID) {
- records.add(recordAndMetadata);
- } else {
- pendingRecords
- .computeIfAbsent(producerId, __ -> new
TimelineHashSet<>(snapshotRegistry, 0))
- .add(recordAndMetadata);
- }
- }
-
- @Override
- public void replayEndTransactionMarker(
- long producerId,
- short producerEpoch,
- TransactionResult result
- ) throws RuntimeException {
- if (result == TransactionResult.COMMIT) {
- TimelineHashSet<RecordAndMetadata> pending =
pendingRecords.remove(producerId);
- if (pending == null) return;
- records.addAll(pending);
- } else {
- pendingRecords.remove(producerId);
- }
- }
-
- Set<String> pendingRecords(long producerId) {
- TimelineHashSet<RecordAndMetadata> pending =
pendingRecords.get(producerId);
- if (pending == null) return Set.of();
- return pending.stream().map(record ->
record.record).collect(Collectors.toUnmodifiableSet());
- }
-
- Set<String> records() {
- return records.stream().map(record ->
record.record).collect(Collectors.toUnmodifiableSet());
- }
-
- List<RecordAndMetadata> fullRecords() {
- return records
- .stream()
- .sorted(Comparator.comparingLong(record -> record.offset))
- .collect(Collectors.toList());
- }
- }
-
- /**
- * A CoordinatorBuilder that creates a MockCoordinator.
- */
- private static class MockCoordinatorShardBuilder implements
CoordinatorShardBuilder<MockCoordinatorShard, String> {
- private SnapshotRegistry snapshotRegistry;
- private CoordinatorTimer<Void, String> timer;
- private CoordinatorExecutor<String> executor;
-
- @Override
- public CoordinatorShardBuilder<MockCoordinatorShard, String>
withSnapshotRegistry(
- SnapshotRegistry snapshotRegistry
- ) {
- this.snapshotRegistry = snapshotRegistry;
- return this;
- }
-
- @Override
- public CoordinatorShardBuilder<MockCoordinatorShard, String>
withLogContext(
- LogContext logContext
- ) {
- return this;
- }
-
- @Override
- public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(
- Time time
- ) {
- return this;
- }
-
- @Override
- public CoordinatorShardBuilder<MockCoordinatorShard, String>
withExecutor(
- CoordinatorExecutor<String> executor
- ) {
- this.executor = executor;
- return this;
- }
-
- @Override
- public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(
- CoordinatorTimer<Void, String> timer
- ) {
- this.timer = timer;
- return this;
- }
-
- @Override
- public CoordinatorShardBuilder<MockCoordinatorShard, String>
withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
- return this;
- }
-
- @Override
- public CoordinatorShardBuilder<MockCoordinatorShard, String>
withTopicPartition(
- TopicPartition topicPartition
- ) {
- return this;
- }
-
- @Override
- public MockCoordinatorShard build() {
- return new MockCoordinatorShard(
- Objects.requireNonNull(this.snapshotRegistry),
- Objects.requireNonNull(this.timer),
- Objects.requireNonNull(this.executor)
- );
- }
- }
-
- /**
- * A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder.
- */
- private static class MockCoordinatorShardBuilderSupplier implements
CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
- @Override
- public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
- return new MockCoordinatorShardBuilder();
- }
- }
-
private static MemoryRecords records(
long timestamp,
String... records
@@ -4917,7 +4478,7 @@ public class CoordinatorRuntimeTest {
// Schedule a write which schedules an async tasks.
CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, writeTimeout,
state -> {
- state.executor.schedule(
+ state.executor().schedule(
"write#1#task",
() -> "task result",
(result, exception) -> {
@@ -4935,7 +4496,7 @@ public class CoordinatorRuntimeTest {
// We should have a new write event in the queue as a result of the
// task being executed immediately.
- assertEquals(1, processor.queue.size());
+ assertEquals(1, processor.size());
// Verify the state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
@@ -4949,7 +4510,7 @@ public class CoordinatorRuntimeTest {
processor.poll();
// The processor must be empty now.
- assertEquals(0, processor.queue.size());
+ assertEquals(0, processor.size());
// Verify the state.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java
new file mode 100644
index 00000000000..60b74c3a12f
--- /dev/null
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * A CoordinatorEventProcessor that directly executes the operations. This is
+ * useful in unit tests where execution in threads is not required.
+ */
+public class DirectEventProcessor implements CoordinatorEventProcessor {
+ @Override
+ public void enqueueLast(CoordinatorEvent event) throws
RejectedExecutionException {
+ try {
+ event.run();
+ } catch (Throwable ex) {
+ event.complete(ex);
+ }
+ }
+
+ @Override
+ public void enqueueFirst(CoordinatorEvent event) throws
RejectedExecutionException {
+ try {
+ event.run();
+ } catch (Throwable ex) {
+ event.complete(ex);
+ }
+ }
+
+ @Override
+ public void close() {}
+}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ManualEventProcessor.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ManualEventProcessor.java
new file mode 100644
index 00000000000..0a55bcfafa5
--- /dev/null
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ManualEventProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * A CoordinatorEventProcessor that queues events and execute the next one
+ * when poll() is called.
+ */
+public class ManualEventProcessor implements CoordinatorEventProcessor {
+ private final Deque<CoordinatorEvent> queue = new LinkedList<>();
+
+ @Override
+ public void enqueueLast(CoordinatorEvent event) throws
RejectedExecutionException {
+ queue.addLast(event);
+ }
+
+ @Override
+ public void enqueueFirst(CoordinatorEvent event) throws
RejectedExecutionException {
+ queue.addFirst(event);
+ }
+
+ public boolean poll() {
+ CoordinatorEvent event = queue.poll();
+ if (event == null) return false;
+
+ try {
+ event.run();
+ } catch (Throwable ex) {
+ event.complete(ex);
+ }
+
+ return true;
+ }
+
+ public int size() {
+ return queue.size();
+ }
+
+ @Override
+ public void close() {}
+}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorLoader.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorLoader.java
new file mode 100644
index 00000000000..54fae743274
--- /dev/null
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorLoader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+public class MockCoordinatorLoader implements CoordinatorLoader<String> {
+ private final LoadSummary summary;
+ private final List<Long> lastWrittenOffsets;
+ private final List<Long> lastCommittedOffsets;
+
+ public MockCoordinatorLoader(
+ LoadSummary summary,
+ List<Long> lastWrittenOffsets,
+ List<Long> lastCommittedOffsets
+ ) {
+ this.summary = summary;
+ this.lastWrittenOffsets = lastWrittenOffsets;
+ this.lastCommittedOffsets = lastCommittedOffsets;
+ }
+
+ public MockCoordinatorLoader() {
+ this(null, List.of(), List.of());
+ }
+
+ @Override
+ public CompletableFuture<LoadSummary> load(
+ TopicPartition tp,
+ CoordinatorPlayback<String> replayable
+ ) {
+ lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset);
+ lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset);
+ return CompletableFuture.completedFuture(summary);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
new file mode 100644
index 00000000000..1fec7a9e000
--- /dev/null
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+public class MockCoordinatorShard implements CoordinatorShard<String> {
+ static record RecordAndMetadata(
+ long offset,
+ long producerId,
+ short producerEpoch,
+ String record
+ ) {
+ public RecordAndMetadata(
+ long offset,
+ String record
+ ) {
+ this(
+ offset,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ record
+ );
+ }
+ }
+
+ private final SnapshotRegistry snapshotRegistry;
+ private final TimelineHashSet<RecordAndMetadata> records;
+ private final TimelineHashMap<Long, TimelineHashSet<RecordAndMetadata>>
pendingRecords;
+ private final CoordinatorTimer<Void, String> timer;
+ private final CoordinatorExecutor<String> executor;
+
+ MockCoordinatorShard(
+ SnapshotRegistry snapshotRegistry,
+ CoordinatorTimer<Void, String> timer
+ ) {
+ this(snapshotRegistry, timer, null);
+ }
+
+ MockCoordinatorShard(
+ SnapshotRegistry snapshotRegistry,
+ CoordinatorTimer<Void, String> timer,
+ CoordinatorExecutor<String> executor
+ ) {
+ this.snapshotRegistry = snapshotRegistry;
+ this.records = new TimelineHashSet<>(snapshotRegistry, 0);
+ this.pendingRecords = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.timer = timer;
+ this.executor = executor;
+ }
+
+ @Override
+ public void replay(
+ long offset,
+ long producerId,
+ short producerEpoch,
+ String record
+ ) throws RuntimeException {
+ RecordAndMetadata recordAndMetadata = new RecordAndMetadata(
+ offset,
+ producerId,
+ producerEpoch,
+ record
+ );
+
+ if (producerId == RecordBatch.NO_PRODUCER_ID) {
+ records.add(recordAndMetadata);
+ } else {
+ pendingRecords
+ .computeIfAbsent(producerId, __ -> new
TimelineHashSet<>(snapshotRegistry, 0))
+ .add(recordAndMetadata);
+ }
+ }
+
+ @Override
+ public void replayEndTransactionMarker(
+ long producerId,
+ short producerEpoch,
+ TransactionResult result
+ ) throws RuntimeException {
+ if (result == TransactionResult.COMMIT) {
+ TimelineHashSet<RecordAndMetadata> pending =
pendingRecords.remove(producerId);
+ if (pending == null) return;
+ records.addAll(pending);
+ } else {
+ pendingRecords.remove(producerId);
+ }
+ }
+
+ Set<String> pendingRecords(long producerId) {
+ TimelineHashSet<RecordAndMetadata> pending =
pendingRecords.get(producerId);
+ if (pending == null) return Set.of();
+ return pending.stream().map(record ->
record.record).collect(Collectors.toUnmodifiableSet());
+ }
+
+ Set<String> records() {
+ return records.stream().map(record ->
record.record).collect(Collectors.toUnmodifiableSet());
+ }
+
+ List<RecordAndMetadata> fullRecords() {
+ return records
+ .stream()
+ .sorted(Comparator.comparingLong(record -> record.offset))
+ .toList();
+ }
+
+ CoordinatorTimer<Void, String> timer() {
+ return timer;
+ }
+
+ CoordinatorExecutor<String> executor() {
+ return executor;
+ }
+}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
new file mode 100644
index 00000000000..dea2dcc1c17
--- /dev/null
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import java.util.Objects;
+
+/**
+ * A CoordinatorBuilder that creates a MockCoordinator.
+ */
+public class MockCoordinatorShardBuilder implements
CoordinatorShardBuilder<MockCoordinatorShard, String> {
+ private SnapshotRegistry snapshotRegistry;
+ private CoordinatorTimer<Void, String> timer;
+ private CoordinatorExecutor<String> executor;
+
+ @Override
+ public CoordinatorShardBuilder<MockCoordinatorShard, String>
withSnapshotRegistry(
+ SnapshotRegistry snapshotRegistry
+ ) {
+ this.snapshotRegistry = snapshotRegistry;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<MockCoordinatorShard, String>
withLogContext(
+ LogContext logContext
+ ) {
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(
+ Time time
+ ) {
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<MockCoordinatorShard, String> withExecutor(
+ CoordinatorExecutor<String> executor
+ ) {
+ this.executor = executor;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(
+ CoordinatorTimer<Void, String> timer
+ ) {
+ this.timer = timer;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<MockCoordinatorShard, String>
withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<MockCoordinatorShard, String>
withTopicPartition(
+ TopicPartition topicPartition
+ ) {
+ return this;
+ }
+
+ @Override
+ public MockCoordinatorShard build() {
+ return new MockCoordinatorShard(
+ Objects.requireNonNull(this.snapshotRegistry),
+ Objects.requireNonNull(this.timer),
+ Objects.requireNonNull(this.executor)
+ );
+ }
+}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilderSupplier.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilderSupplier.java
new file mode 100644
index 00000000000..09efdba5e81
--- /dev/null
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilderSupplier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+/**
+ * A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder.
+ */
+public class MockCoordinatorShardBuilderSupplier implements
CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
+ @Override
+ public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
+ return new MockCoordinatorShardBuilder();
+ }
+}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockPartitionWriter.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockPartitionWriter.java
new file mode 100644
index 00000000000..19564093bb4
--- /dev/null
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockPartitionWriter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+public class MockPartitionWriter extends InMemoryPartitionWriter {
+ private final Time time;
+ private final int maxWrites;
+ private final boolean failEndMarker;
+ private final AtomicInteger writeCount = new AtomicInteger(0);
+
+ public MockPartitionWriter() {
+ this(new MockTime(), Integer.MAX_VALUE, false);
+ }
+
+ public MockPartitionWriter(int maxWrites) {
+ this(new MockTime(), maxWrites, false);
+ }
+
+ public MockPartitionWriter(boolean failEndMarker) {
+ this(new MockTime(), Integer.MAX_VALUE, failEndMarker);
+ }
+
+ public MockPartitionWriter(Time time, int maxWrites, boolean
failEndMarker) {
+ super(false);
+ this.time = time;
+ this.maxWrites = maxWrites;
+ this.failEndMarker = failEndMarker;
+ }
+
+ @Override
+ public void registerListener(TopicPartition tp, Listener listener) {
+ super.registerListener(tp, listener);
+ }
+
+ @Override
+ public void deregisterListener(TopicPartition tp, Listener listener) {
+ super.deregisterListener(tp, listener);
+ }
+
+ @Override
+ public long append(
+ TopicPartition tp,
+ VerificationGuard verificationGuard,
+ MemoryRecords batch
+ ) {
+ if (batch.sizeInBytes() > config(tp).maxMessageSize())
+ throw new RecordTooLargeException("Batch is larger than the max
message size");
+
+ // We don't want the coordinator to write empty batches.
+ if (batch.validBytes() <= 0)
+ throw new KafkaException("Coordinator tried to write an empty
batch");
+
+ if (writeCount.incrementAndGet() > maxWrites)
+ throw new KafkaException("Maximum number of writes reached");
+
+ if (failEndMarker && batch.firstBatch().isControlBatch())
+ throw new KafkaException("Couldn't write end marker.");
+
+ time.sleep(10);
+ return super.append(tp, verificationGuard, batch);
+ }
+}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java
index 40c23a2759a..b1436a3eff5 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
-import
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeTest.MockCoordinatorShard;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/StringSerializer.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/StringSerializer.java
new file mode 100644
index 00000000000..16b72f15882
--- /dev/null
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/StringSerializer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import java.nio.charset.Charset;
+
+/**
+ * A Serializer for strings.
+ */
+public class StringSerializer implements Serializer<String> {
+ @Override
+ public byte[] serializeKey(String record) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(String record) {
+ return record.getBytes(Charset.defaultCharset());
+ }
+}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ThrowingSerializer.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ThrowingSerializer.java
new file mode 100644
index 00000000000..26124c65a13
--- /dev/null
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ThrowingSerializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import java.nio.BufferOverflowException;
+
+/**
+ * A Serializer that throws exceptions on demand.
+ */
+public class ThrowingSerializer<T> implements Serializer<T> {
+ private final Serializer<T> serializer;
+ private boolean throwOnNextOperation;
+
+ public ThrowingSerializer(Serializer<T> serializer) {
+ this.serializer = serializer;
+ this.throwOnNextOperation = false;
+ }
+
+ public void throwOnNextOperation() {
+ throwOnNextOperation = true;
+ }
+
+ @Override
+ public byte[] serializeKey(T record) {
+ return serializer.serializeKey(record);
+ }
+
+ @Override
+ public byte[] serializeValue(T record) {
+ if (throwOnNextOperation) {
+ throwOnNextOperation = false;
+ throw new BufferOverflowException();
+ }
+ return serializer.serializeValue(record);
+ }
+}