This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ec12d360a1a MINOR: Move inner test classes out of 
CoordinatorRuntimeTest (#19258)
ec12d360a1a is described below

commit ec12d360a1ad21229a125153f0982d68fed06aa8
Author: Sean Quah <[email protected]>
AuthorDate: Fri Mar 21 15:36:23 2025 +0000

    MINOR: Move inner test classes out of CoordinatorRuntimeTest (#19258)
    
    Some of these classes are generally useful for testing.
    MockCoordinatorShard is already shared by SnapshottableCoordinatorTest.
    
    Also do some minor refactors.
    
    Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai 
<[email protected]>, David Jacot <[email protected]>
---
 .../common/runtime/CoordinatorRuntimeTest.java     | 445 +--------------------
 .../common/runtime/DirectEventProcessor.java       |  46 +++
 .../common/runtime/ManualEventProcessor.java       |  59 +++
 .../common/runtime/MockCoordinatorLoader.java      |  58 +++
 .../common/runtime/MockCoordinatorShard.java       | 140 +++++++
 .../runtime/MockCoordinatorShardBuilder.java       |  92 +++++
 .../MockCoordinatorShardBuilderSupplier.java       |  27 ++
 .../common/runtime/MockPartitionWriter.java        |  89 +++++
 .../runtime/SnapshottableCoordinatorTest.java      |   1 -
 .../common/runtime/StringSerializer.java           |  34 ++
 .../common/runtime/ThrowingSerializer.java         |  50 +++
 11 files changed, 598 insertions(+), 443 deletions(-)

diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 72f31b0a7d8..fb5b4a68572 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
@@ -46,8 +45,6 @@ import org.apache.kafka.server.util.timer.MockTimer;
 import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.VerificationGuard;
 import org.apache.kafka.timeline.SnapshotRegistry;
-import org.apache.kafka.timeline.TimelineHashMap;
-import org.apache.kafka.timeline.TimelineHashSet;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -60,18 +57,13 @@ import java.nio.charset.Charset;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.OptionalInt;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -111,437 +103,6 @@ public class CoordinatorRuntimeTest {
 
     private static final short TXN_OFFSET_COMMIT_LATEST_VERSION = 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion();
 
-    private static class StringSerializer implements Serializer<String> {
-        @Override
-        public byte[] serializeKey(String record) {
-            return null;
-        }
-
-        @Override
-        public byte[] serializeValue(String record) {
-            return record.getBytes(Charset.defaultCharset());
-        }
-    }
-
-    private static class ThrowingSerializer<T> implements Serializer<T> {
-        private final Serializer<T> serializer;
-        private boolean throwOnNextOperation;
-
-        public ThrowingSerializer(Serializer<T> serializer) {
-            this.serializer = serializer;
-            this.throwOnNextOperation = false;
-        }
-
-        public void throwOnNextOperation() {
-            throwOnNextOperation = true;
-        }
-
-        @Override
-        public byte[] serializeKey(T record) {
-            return serializer.serializeKey(record);
-        }
-
-        @Override
-        public byte[] serializeValue(T record) {
-            if (throwOnNextOperation) {
-                throwOnNextOperation = false;
-                throw new BufferOverflowException();
-            }
-            return serializer.serializeValue(record);
-        }
-    }
-
-    /**
-     * A CoordinatorEventProcessor that directly executes the operations. This 
is
-     * useful in unit tests where execution in threads is not required.
-     */
-    private static class DirectEventProcessor implements 
CoordinatorEventProcessor {
-        @Override
-        public void enqueueLast(CoordinatorEvent event) throws 
RejectedExecutionException {
-            try {
-                event.run();
-            } catch (Throwable ex) {
-                event.complete(ex);
-            }
-        }
-
-        @Override
-        public void enqueueFirst(CoordinatorEvent event) throws 
RejectedExecutionException {
-            try {
-                event.run();
-            } catch (Throwable ex) {
-                event.complete(ex);
-            }
-        }
-
-        @Override
-        public void close() {}
-    }
-
-    /**
-     * A CoordinatorEventProcessor that queues event and execute the next one
-     * when poll() is called.
-     */
-    private static class ManualEventProcessor implements 
CoordinatorEventProcessor {
-        private final Deque<CoordinatorEvent> queue = new LinkedList<>();
-
-        @Override
-        public void enqueueLast(CoordinatorEvent event) throws 
RejectedExecutionException {
-            queue.addLast(event);
-        }
-
-        @Override
-        public void enqueueFirst(CoordinatorEvent event) throws 
RejectedExecutionException {
-            queue.addFirst(event);
-        }
-
-        public boolean poll() {
-            CoordinatorEvent event = queue.poll();
-            if (event == null) return false;
-
-            try {
-                event.run();
-            } catch (Throwable ex) {
-                event.complete(ex);
-            }
-
-            return true;
-        }
-
-        public int size() {
-            return queue.size();
-        }
-
-        @Override
-        public void close() {
-
-        }
-    }
-
-    /**
-     * A CoordinatorLoader that always succeeds.
-     */
-    private static class MockCoordinatorLoader implements 
CoordinatorLoader<String> {
-        private final LoadSummary summary;
-        private final List<Long> lastWrittenOffsets;
-        private final List<Long> lastCommittedOffsets;
-
-        public MockCoordinatorLoader(
-            LoadSummary summary,
-            List<Long> lastWrittenOffsets,
-            List<Long> lastCommittedOffsets
-        ) {
-            this.summary = summary;
-            this.lastWrittenOffsets = lastWrittenOffsets;
-            this.lastCommittedOffsets = lastCommittedOffsets;
-        }
-
-        public MockCoordinatorLoader() {
-            this(null, List.of(), List.of());
-        }
-
-        @Override
-        public CompletableFuture<LoadSummary> load(
-            TopicPartition tp,
-            CoordinatorPlayback<String> replayable
-        ) {
-            lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset);
-            
lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset);
-            return CompletableFuture.completedFuture(summary);
-        }
-
-        @Override
-        public void close() { }
-    }
-
-    /**
-     * An in-memory partition writer that accepts a maximum number of writes.
-     */
-    private static class MockPartitionWriter extends InMemoryPartitionWriter {
-        private final Time time;
-        private final int maxWrites;
-        private final boolean failEndMarker;
-        private final AtomicInteger writeCount = new AtomicInteger(0);
-
-        public MockPartitionWriter() {
-            this(new MockTime(), Integer.MAX_VALUE, false);
-        }
-
-        public MockPartitionWriter(int maxWrites) {
-            this(new MockTime(), maxWrites, false);
-        }
-
-        public MockPartitionWriter(boolean failEndMarker) {
-            this(new MockTime(), Integer.MAX_VALUE, failEndMarker);
-        }
-
-        public MockPartitionWriter(Time time, int maxWrites, boolean 
failEndMarker) {
-            super(false);
-            this.time = time;
-            this.maxWrites = maxWrites;
-            this.failEndMarker = failEndMarker;
-        }
-
-        @Override
-        public void registerListener(TopicPartition tp, Listener listener) {
-            super.registerListener(tp, listener);
-        }
-
-        @Override
-        public void deregisterListener(TopicPartition tp, Listener listener) {
-            super.deregisterListener(tp, listener);
-        }
-
-        @Override
-        public long append(
-            TopicPartition tp,
-            VerificationGuard verificationGuard,
-            MemoryRecords batch
-        ) {
-            if (batch.sizeInBytes() > config(tp).maxMessageSize())
-                throw new RecordTooLargeException("Batch is larger than the 
max message size");
-
-            // We don't want the coordinator to write empty batches.
-            if (batch.validBytes() <= 0)
-                throw new KafkaException("Coordinator tried to write an empty 
batch");
-
-            if (writeCount.incrementAndGet() > maxWrites)
-                throw new KafkaException("Maximum number of writes reached");
-
-            if (failEndMarker && batch.firstBatch().isControlBatch())
-                throw new KafkaException("Couldn't write end marker.");
-
-            time.sleep(10);
-            return super.append(tp, verificationGuard, batch);
-        }
-    }
-
-    /**
-     * A simple Coordinator implementation that stores the records into a set.
-     */
-    static class MockCoordinatorShard implements CoordinatorShard<String> {
-        static class RecordAndMetadata {
-            public final long offset;
-            public final long producerId;
-            public final short producerEpoch;
-            public final String record;
-
-            public RecordAndMetadata(
-                long offset,
-                String record
-            ) {
-                this(
-                    offset,
-                    RecordBatch.NO_PRODUCER_ID,
-                    RecordBatch.NO_PRODUCER_EPOCH,
-                    record
-                );
-            }
-
-            public RecordAndMetadata(
-                long offset,
-                long producerId,
-                short producerEpoch,
-                String record
-            ) {
-                this.offset = offset;
-                this.producerId = producerId;
-                this.producerEpoch = producerEpoch;
-                this.record = record;
-            }
-
-            @Override
-            public boolean equals(Object o) {
-                if (this == o) return true;
-                if (o == null || getClass() != o.getClass()) return false;
-
-                RecordAndMetadata that = (RecordAndMetadata) o;
-
-                if (offset != that.offset) return false;
-                if (producerId != that.producerId) return false;
-                if (producerEpoch != that.producerEpoch) return false;
-                return Objects.equals(record, that.record);
-            }
-
-            @Override
-            public int hashCode() {
-                int result = (int) (offset ^ (offset >>> 32));
-                result = 31 * result + (int) (producerId ^ (producerId >>> 
32));
-                result = 31 * result + (int) producerEpoch;
-                result = 31 * result + (record != null ? record.hashCode() : 
0);
-                return result;
-            }
-
-            @Override
-            public String toString() {
-                return "RecordAndMetadata(" +
-                    "offset=" + offset +
-                    ", producerId=" + producerId +
-                    ", producerEpoch=" + producerEpoch +
-                    ", record='" + record.substring(0, Math.min(10, 
record.length())) + '\'' +
-                    ')';
-            }
-        }
-
-        private final SnapshotRegistry snapshotRegistry;
-        private final TimelineHashSet<RecordAndMetadata> records;
-        private final TimelineHashMap<Long, 
TimelineHashSet<RecordAndMetadata>> pendingRecords;
-        private final CoordinatorTimer<Void, String> timer;
-        private final CoordinatorExecutor<String> executor;
-
-        MockCoordinatorShard(
-            SnapshotRegistry snapshotRegistry,
-            CoordinatorTimer<Void, String> timer
-        ) {
-            this(snapshotRegistry, timer, null);
-        }
-
-        MockCoordinatorShard(
-            SnapshotRegistry snapshotRegistry,
-            CoordinatorTimer<Void, String> timer,
-            CoordinatorExecutor<String> executor
-        ) {
-            this.snapshotRegistry = snapshotRegistry;
-            this.records = new TimelineHashSet<>(snapshotRegistry, 0);
-            this.pendingRecords = new TimelineHashMap<>(snapshotRegistry, 0);
-            this.timer = timer;
-            this.executor = executor;
-        }
-
-        @Override
-        public void replay(
-            long offset,
-            long producerId,
-            short producerEpoch,
-            String record
-        ) throws RuntimeException {
-            RecordAndMetadata recordAndMetadata = new RecordAndMetadata(
-                offset,
-                producerId,
-                producerEpoch,
-                record
-            );
-
-            if (producerId == RecordBatch.NO_PRODUCER_ID) {
-                records.add(recordAndMetadata);
-            } else {
-                pendingRecords
-                    .computeIfAbsent(producerId, __ -> new 
TimelineHashSet<>(snapshotRegistry, 0))
-                    .add(recordAndMetadata);
-            }
-        }
-
-        @Override
-        public void replayEndTransactionMarker(
-            long producerId,
-            short producerEpoch,
-            TransactionResult result
-        ) throws RuntimeException {
-            if (result == TransactionResult.COMMIT) {
-                TimelineHashSet<RecordAndMetadata> pending = 
pendingRecords.remove(producerId);
-                if (pending == null) return;
-                records.addAll(pending);
-            } else {
-                pendingRecords.remove(producerId);
-            }
-        }
-
-        Set<String> pendingRecords(long producerId) {
-            TimelineHashSet<RecordAndMetadata> pending = 
pendingRecords.get(producerId);
-            if (pending == null) return Set.of();
-            return pending.stream().map(record -> 
record.record).collect(Collectors.toUnmodifiableSet());
-        }
-
-        Set<String> records() {
-            return records.stream().map(record -> 
record.record).collect(Collectors.toUnmodifiableSet());
-        }
-
-        List<RecordAndMetadata> fullRecords() {
-            return records
-                .stream()
-                .sorted(Comparator.comparingLong(record -> record.offset))
-                .collect(Collectors.toList());
-        }
-    }
-
-    /**
-     * A CoordinatorBuilder that creates a MockCoordinator.
-     */
-    private static class MockCoordinatorShardBuilder implements 
CoordinatorShardBuilder<MockCoordinatorShard, String> {
-        private SnapshotRegistry snapshotRegistry;
-        private CoordinatorTimer<Void, String> timer;
-        private CoordinatorExecutor<String> executor;
-
-        @Override
-        public CoordinatorShardBuilder<MockCoordinatorShard, String> 
withSnapshotRegistry(
-            SnapshotRegistry snapshotRegistry
-        ) {
-            this.snapshotRegistry = snapshotRegistry;
-            return this;
-        }
-
-        @Override
-        public CoordinatorShardBuilder<MockCoordinatorShard, String> 
withLogContext(
-            LogContext logContext
-        ) {
-            return this;
-        }
-
-        @Override
-        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(
-            Time time
-        ) {
-            return this;
-        }
-
-        @Override
-        public CoordinatorShardBuilder<MockCoordinatorShard, String> 
withExecutor(
-            CoordinatorExecutor<String> executor
-        ) {
-            this.executor = executor;
-            return this;
-        }
-
-        @Override
-        public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(
-            CoordinatorTimer<Void, String> timer
-        ) {
-            this.timer = timer;
-            return this;
-        }
-
-        @Override
-        public CoordinatorShardBuilder<MockCoordinatorShard, String> 
withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
-            return this;
-        }
-
-        @Override
-        public CoordinatorShardBuilder<MockCoordinatorShard, String> 
withTopicPartition(
-            TopicPartition topicPartition
-        ) {
-            return this;
-        }
-
-        @Override
-        public MockCoordinatorShard build() {
-            return new MockCoordinatorShard(
-                Objects.requireNonNull(this.snapshotRegistry),
-                Objects.requireNonNull(this.timer),
-                Objects.requireNonNull(this.executor)
-            );
-        }
-    }
-
-    /**
-     * A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder.
-     */
-    private static class MockCoordinatorShardBuilderSupplier implements 
CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
-        @Override
-        public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
-            return new MockCoordinatorShardBuilder();
-        }
-    }
-
     private static MemoryRecords records(
         long timestamp,
         String... records
@@ -4917,7 +4478,7 @@ public class CoordinatorRuntimeTest {
         // Schedule a write which schedules an async tasks.
         CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, writeTimeout,
             state -> {
-                state.executor.schedule(
+                state.executor().schedule(
                     "write#1#task",
                     () -> "task result",
                     (result, exception) -> {
@@ -4935,7 +4496,7 @@ public class CoordinatorRuntimeTest {
 
         // We should have a new write event in the queue as a result of the
         // task being executed immediately.
-        assertEquals(1, processor.queue.size());
+        assertEquals(1, processor.size());
 
         // Verify the state.
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
@@ -4949,7 +4510,7 @@ public class CoordinatorRuntimeTest {
         processor.poll();
 
         // The processor must be empty now.
-        assertEquals(0, processor.queue.size());
+        assertEquals(0, processor.size());
 
         // Verify the state.
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java
new file mode 100644
index 00000000000..60b74c3a12f
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * A CoordinatorEventProcessor that directly executes the operations. This is
+ * useful in unit tests where execution in threads is not required.
+ */
+public class DirectEventProcessor implements CoordinatorEventProcessor {
+    @Override
+    public void enqueueLast(CoordinatorEvent event) throws 
RejectedExecutionException {
+        try {
+            event.run();
+        } catch (Throwable ex) {
+            event.complete(ex);
+        }
+    }
+
+    @Override
+    public void enqueueFirst(CoordinatorEvent event) throws 
RejectedExecutionException {
+        try {
+            event.run();
+        } catch (Throwable ex) {
+            event.complete(ex);
+        }
+    }
+
+    @Override
+    public void close() {}
+}
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ManualEventProcessor.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ManualEventProcessor.java
new file mode 100644
index 00000000000..0a55bcfafa5
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ManualEventProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * A CoordinatorEventProcessor that queues events and execute the next one
+ * when poll() is called.
+ */
+public class ManualEventProcessor implements CoordinatorEventProcessor {
+    private final Deque<CoordinatorEvent> queue = new LinkedList<>();
+
+    @Override
+    public void enqueueLast(CoordinatorEvent event) throws 
RejectedExecutionException {
+        queue.addLast(event);
+    }
+
+    @Override
+    public void enqueueFirst(CoordinatorEvent event) throws 
RejectedExecutionException {
+        queue.addFirst(event);
+    }
+
+    public boolean poll() {
+        CoordinatorEvent event = queue.poll();
+        if (event == null) return false;
+
+        try {
+            event.run();
+        } catch (Throwable ex) {
+            event.complete(ex);
+        }
+
+        return true;
+    }
+
+    public int size() {
+        return queue.size();
+    }
+
+    @Override
+    public void close() {}
+}
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorLoader.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorLoader.java
new file mode 100644
index 00000000000..54fae743274
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorLoader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A CoordinatorLoader that always succeeds.
+ */
+public class MockCoordinatorLoader implements CoordinatorLoader<String> {
+    private final LoadSummary summary;
+    private final List<Long> lastWrittenOffsets;
+    private final List<Long> lastCommittedOffsets;
+
+    public MockCoordinatorLoader(
+        LoadSummary summary,
+        List<Long> lastWrittenOffsets,
+        List<Long> lastCommittedOffsets
+    ) {
+        this.summary = summary;
+        this.lastWrittenOffsets = lastWrittenOffsets;
+        this.lastCommittedOffsets = lastCommittedOffsets;
+    }
+
+    public MockCoordinatorLoader() {
+        this(null, List.of(), List.of());
+    }
+
+    @Override
+    public CompletableFuture<LoadSummary> load(
+        TopicPartition tp,
+        CoordinatorPlayback<String> replayable
+    ) {
+        lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset);
+        lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset);
+        return CompletableFuture.completedFuture(summary);
+    }
+
+    @Override
+    public void close() {}
+}
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
new file mode 100644
index 00000000000..1fec7a9e000
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+public class MockCoordinatorShard implements CoordinatorShard<String> {
+    static record RecordAndMetadata(
+        long offset,
+        long producerId,
+        short producerEpoch,
+        String record
+    ) {
+        public RecordAndMetadata(
+            long offset,
+            String record
+        ) {
+            this(
+                offset,
+                RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH,
+                record
+            );
+        }
+    }
+
+    private final SnapshotRegistry snapshotRegistry;
+    private final TimelineHashSet<RecordAndMetadata> records;
+    private final TimelineHashMap<Long, TimelineHashSet<RecordAndMetadata>> 
pendingRecords;
+    private final CoordinatorTimer<Void, String> timer;
+    private final CoordinatorExecutor<String> executor;
+
+    MockCoordinatorShard(
+        SnapshotRegistry snapshotRegistry,
+        CoordinatorTimer<Void, String> timer
+    ) {
+        this(snapshotRegistry, timer, null);
+    }
+
+    MockCoordinatorShard(
+        SnapshotRegistry snapshotRegistry,
+        CoordinatorTimer<Void, String> timer,
+        CoordinatorExecutor<String> executor
+    ) {
+        this.snapshotRegistry = snapshotRegistry;
+        this.records = new TimelineHashSet<>(snapshotRegistry, 0);
+        this.pendingRecords = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.timer = timer;
+        this.executor = executor;
+    }
+
+    @Override
+    public void replay(
+        long offset,
+        long producerId,
+        short producerEpoch,
+        String record
+    ) throws RuntimeException {
+        RecordAndMetadata recordAndMetadata = new RecordAndMetadata(
+            offset,
+            producerId,
+            producerEpoch,
+            record
+        );
+
+        if (producerId == RecordBatch.NO_PRODUCER_ID) {
+            records.add(recordAndMetadata);
+        } else {
+            pendingRecords
+                .computeIfAbsent(producerId, __ -> new 
TimelineHashSet<>(snapshotRegistry, 0))
+                .add(recordAndMetadata);
+        }
+    }
+
+    @Override
+    public void replayEndTransactionMarker(
+        long producerId,
+        short producerEpoch,
+        TransactionResult result
+    ) throws RuntimeException {
+        if (result == TransactionResult.COMMIT) {
+            TimelineHashSet<RecordAndMetadata> pending = 
pendingRecords.remove(producerId);
+            if (pending == null) return;
+            records.addAll(pending);
+        } else {
+            pendingRecords.remove(producerId);
+        }
+    }
+
+    Set<String> pendingRecords(long producerId) {
+        TimelineHashSet<RecordAndMetadata> pending = 
pendingRecords.get(producerId);
+        if (pending == null) return Set.of();
+        return pending.stream().map(record -> 
record.record).collect(Collectors.toUnmodifiableSet());
+    }
+
+    Set<String> records() {
+        return records.stream().map(record -> 
record.record).collect(Collectors.toUnmodifiableSet());
+    }
+
+    List<RecordAndMetadata> fullRecords() {
+        return records
+            .stream()
+            .sorted(Comparator.comparingLong(record -> record.offset))
+            .toList();
+    }
+
+    CoordinatorTimer<Void, String> timer() {
+        return timer;
+    }
+
+    CoordinatorExecutor<String> executor() {
+        return executor;
+    }
+}
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
new file mode 100644
index 00000000000..dea2dcc1c17
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import java.util.Objects;
+
+/**
+ * A CoordinatorBuilder that creates a MockCoordinator.
+ */
+public class MockCoordinatorShardBuilder implements 
CoordinatorShardBuilder<MockCoordinatorShard, String> {
+    private SnapshotRegistry snapshotRegistry;
+    private CoordinatorTimer<Void, String> timer;
+    private CoordinatorExecutor<String> executor;
+
+    @Override
+    public CoordinatorShardBuilder<MockCoordinatorShard, String> 
withSnapshotRegistry(
+        SnapshotRegistry snapshotRegistry
+    ) {
+        this.snapshotRegistry = snapshotRegistry;
+        return this;
+    }
+
+    @Override
+    public CoordinatorShardBuilder<MockCoordinatorShard, String> 
withLogContext(
+        LogContext logContext
+    ) {
+        return this;
+    }
+
+    @Override
+    public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(
+        Time time
+    ) {
+        return this;
+    }
+
+    @Override
+    public CoordinatorShardBuilder<MockCoordinatorShard, String> withExecutor(
+        CoordinatorExecutor<String> executor
+    ) {
+        this.executor = executor;
+        return this;
+    }
+
+    @Override
+    public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(
+        CoordinatorTimer<Void, String> timer
+    ) {
+        this.timer = timer;
+        return this;
+    }
+
+    @Override
+    public CoordinatorShardBuilder<MockCoordinatorShard, String> 
withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
+        return this;
+    }
+
+    @Override
+    public CoordinatorShardBuilder<MockCoordinatorShard, String> 
withTopicPartition(
+        TopicPartition topicPartition
+    ) {
+        return this;
+    }
+
+    @Override
+    public MockCoordinatorShard build() {
+        return new MockCoordinatorShard(
+            Objects.requireNonNull(this.snapshotRegistry),
+            Objects.requireNonNull(this.timer),
+            Objects.requireNonNull(this.executor)
+        );
+    }
+}
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilderSupplier.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilderSupplier.java
new file mode 100644
index 00000000000..09efdba5e81
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilderSupplier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+/**
+ * A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder.
+ */
+public class MockCoordinatorShardBuilderSupplier implements 
CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
+    @Override
+    public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
+        return new MockCoordinatorShardBuilder();
+    }
+}
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockPartitionWriter.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockPartitionWriter.java
new file mode 100644
index 00000000000..19564093bb4
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockPartitionWriter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An in-memory partition writer that accepts a maximum number of writes.
+ */
+public class MockPartitionWriter extends InMemoryPartitionWriter {
+    private final Time time;
+    private final int maxWrites;
+    private final boolean failEndMarker;
+    private final AtomicInteger writeCount = new AtomicInteger(0);
+
+    public MockPartitionWriter() {
+        this(new MockTime(), Integer.MAX_VALUE, false);
+    }
+
+    public MockPartitionWriter(int maxWrites) {
+        this(new MockTime(), maxWrites, false);
+    }
+
+    public MockPartitionWriter(boolean failEndMarker) {
+        this(new MockTime(), Integer.MAX_VALUE, failEndMarker);
+    }
+
+    public MockPartitionWriter(Time time, int maxWrites, boolean 
failEndMarker) {
+        super(false);
+        this.time = time;
+        this.maxWrites = maxWrites;
+        this.failEndMarker = failEndMarker;
+    }
+
+    @Override
+    public void registerListener(TopicPartition tp, Listener listener) {
+        super.registerListener(tp, listener);
+    }
+
+    @Override
+    public void deregisterListener(TopicPartition tp, Listener listener) {
+        super.deregisterListener(tp, listener);
+    }
+
+    @Override
+    public long append(
+        TopicPartition tp,
+        VerificationGuard verificationGuard,
+        MemoryRecords batch
+    ) {
+        if (batch.sizeInBytes() > config(tp).maxMessageSize())
+            throw new RecordTooLargeException("Batch is larger than the max 
message size");
+
+        // We don't want the coordinator to write empty batches.
+        if (batch.validBytes() <= 0)
+            throw new KafkaException("Coordinator tried to write an empty 
batch");
+
+        if (writeCount.incrementAndGet() > maxWrites)
+            throw new KafkaException("Maximum number of writes reached");
+
+        if (failEndMarker && batch.firstBatch().isControlBatch())
+            throw new KafkaException("Couldn't write end marker.");
+
+        time.sleep(10);
+        return super.append(tp, verificationGuard, batch);
+    }
+}
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java
index 40c23a2759a..b1436a3eff5 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.common.runtime;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
-import 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeTest.MockCoordinatorShard;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
 import org.junit.jupiter.api.Test;
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/StringSerializer.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/StringSerializer.java
new file mode 100644
index 00000000000..16b72f15882
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/StringSerializer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import java.nio.charset.Charset;
+
+/**
+ * A Serializer for strings.
+ */
+public class StringSerializer implements Serializer<String> {
+    @Override
+    public byte[] serializeKey(String record) {
+        return null;
+    }
+
+    @Override
+    public byte[] serializeValue(String record) {
+        return record.getBytes(Charset.defaultCharset());
+    }
+}
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ThrowingSerializer.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ThrowingSerializer.java
new file mode 100644
index 00000000000..26124c65a13
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ThrowingSerializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import java.nio.BufferOverflowException;
+
+/**
+ * A Serializer that throws exceptions on demand.
+ */
+public class ThrowingSerializer<T> implements Serializer<T> {
+    private final Serializer<T> serializer;
+    private boolean throwOnNextOperation;
+
+    public ThrowingSerializer(Serializer<T> serializer) {
+        this.serializer = serializer;
+        this.throwOnNextOperation = false;
+    }
+
+    public void throwOnNextOperation() {
+        throwOnNextOperation = true;
+    }
+
+    @Override
+    public byte[] serializeKey(T record) {
+        return serializer.serializeKey(record);
+    }
+
+    @Override
+    public byte[] serializeValue(T record) {
+        if (throwOnNextOperation) {
+            throwOnNextOperation = false;
+            throw new BufferOverflowException();
+        }
+        return serializer.serializeValue(record);
+    }
+}


Reply via email to