This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9992e4cfa76 MINOR: Move record helper methods out of 
CoordinatorRuntimeTest (#19279)
9992e4cfa76 is described below

commit 9992e4cfa76bbad63523f57c04208237ea665420
Author: Sean Quah <[email protected]>
AuthorDate: Tue Mar 25 10:56:15 2025 +0000

    MINOR: Move record helper methods out of CoordinatorRuntimeTest (#19279)
    
    These methods are generally useful for constructing records in
    coordinator tests.
    
    Reviewers: David Jacot <[email protected]>
---
 .../common/runtime/CoordinatorRuntimeTest.java     | 130 +-------------------
 .../kafka/coordinator/common/runtime/TestUtil.java | 132 +++++++++++++++++++++
 2 files changed, 135 insertions(+), 127 deletions(-)

diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index fb5b4a68572..74ebecc0989 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -18,22 +18,14 @@ package org.apache.kafka.coordinator.common.runtime;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.AbstractRecords;
-import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.ControlRecordType;
-import org.apache.kafka.common.record.EndTransactionMarker;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.RecordVersion;
-import org.apache.kafka.common.record.SimpleRecord;
-import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -52,8 +44,6 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.mockito.ArgumentMatcher;
 
 import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -77,6 +67,9 @@ import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.Coo
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.HighWatermarkListener.NO_OFFSET;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE;
+import static 
org.apache.kafka.coordinator.common.runtime.TestUtil.endTransactionMarker;
+import static org.apache.kafka.coordinator.common.runtime.TestUtil.records;
+import static 
org.apache.kafka.coordinator.common.runtime.TestUtil.transactionalRecords;
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -103,123 +96,6 @@ public class CoordinatorRuntimeTest {
 
     private static final short TXN_OFFSET_COMMIT_LATEST_VERSION = 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion();
 
-    private static MemoryRecords records(
-        long timestamp,
-        String... records
-    ) {
-        return records(timestamp, 
Arrays.stream(records).collect(Collectors.toList()));
-    }
-
-    private static MemoryRecords records(
-        long timestamp,
-        List<String> records
-    ) {
-        if (records.isEmpty())
-            return MemoryRecords.EMPTY;
-
-        List<SimpleRecord> simpleRecords = records.stream().map(record ->
-            new SimpleRecord(timestamp, 
record.getBytes(Charset.defaultCharset()))
-        ).collect(Collectors.toList());
-
-        int sizeEstimate = AbstractRecords.estimateSizeInBytes(
-            RecordVersion.current().value,
-            CompressionType.NONE,
-            simpleRecords
-        );
-
-        ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
-
-        MemoryRecordsBuilder builder = MemoryRecords.builder(
-            buffer,
-            RecordVersion.current().value,
-            Compression.NONE,
-            TimestampType.CREATE_TIME,
-            0L,
-            timestamp,
-            RecordBatch.NO_PRODUCER_ID,
-            RecordBatch.NO_PRODUCER_EPOCH,
-            0,
-            false,
-            RecordBatch.NO_PARTITION_LEADER_EPOCH
-        );
-
-        simpleRecords.forEach(builder::append);
-
-        return builder.build();
-    }
-
-    private static MemoryRecords transactionalRecords(
-        long producerId,
-        short producerEpoch,
-        long timestamp,
-        String... records
-    ) {
-        return transactionalRecords(
-            producerId,
-            producerEpoch,
-            timestamp,
-            Arrays.stream(records).collect(Collectors.toList())
-        );
-    }
-
-    private static MemoryRecords transactionalRecords(
-        long producerId,
-        short producerEpoch,
-        long timestamp,
-        List<String> records
-    ) {
-        if (records.isEmpty())
-            return MemoryRecords.EMPTY;
-
-        List<SimpleRecord> simpleRecords = records.stream().map(record ->
-            new SimpleRecord(timestamp, 
record.getBytes(Charset.defaultCharset()))
-        ).collect(Collectors.toList());
-
-        int sizeEstimate = AbstractRecords.estimateSizeInBytes(
-            RecordVersion.current().value,
-            CompressionType.NONE,
-            simpleRecords
-        );
-
-        ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
-
-        MemoryRecordsBuilder builder = MemoryRecords.builder(
-            buffer,
-            RecordVersion.current().value,
-            Compression.NONE,
-            TimestampType.CREATE_TIME,
-            0L,
-            timestamp,
-            producerId,
-            producerEpoch,
-            0,
-            true,
-            RecordBatch.NO_PARTITION_LEADER_EPOCH
-        );
-
-        simpleRecords.forEach(builder::append);
-
-        return builder.build();
-    }
-
-    private static MemoryRecords endTransactionMarker(
-        long producerId,
-        short producerEpoch,
-        long timestamp,
-        int coordinatorEpoch,
-        ControlRecordType result
-    ) {
-        return MemoryRecords.withEndTransactionMarker(
-            timestamp,
-            producerId,
-            producerEpoch,
-            new EndTransactionMarker(
-                result,
-                coordinatorEpoch
-            )
-        );
-    }
-
     @Test
     public void testScheduleLoading() {
         MockTimer timer = new MockTimer();
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
index 3acd3599e2d..c3eda174671 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
@@ -16,17 +16,149 @@
  */
 package org.apache.kafka.coordinator.common.runtime;
 
+import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
 
 public class TestUtil {
+    public static MemoryRecords records(
+        long timestamp,
+        String... records
+    ) {
+        return records(timestamp, Arrays.stream(records).toList());
+    }
+
+    public static MemoryRecords records(
+        long timestamp,
+        List<String> records
+    ) {
+        if (records.isEmpty())
+            return MemoryRecords.EMPTY;
+
+        List<SimpleRecord> simpleRecords = records.stream().map(record ->
+            new SimpleRecord(timestamp, 
record.getBytes(Charset.defaultCharset()))
+        ).toList();
+
+        int sizeEstimate = AbstractRecords.estimateSizeInBytes(
+            RecordVersion.current().value,
+            CompressionType.NONE,
+            simpleRecords
+        );
+
+        ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(
+            buffer,
+            RecordVersion.current().value,
+            Compression.NONE,
+            TimestampType.CREATE_TIME,
+            0L,
+            timestamp,
+            RecordBatch.NO_PRODUCER_ID,
+            RecordBatch.NO_PRODUCER_EPOCH,
+            0,
+            false,
+            RecordBatch.NO_PARTITION_LEADER_EPOCH
+        );
+
+        simpleRecords.forEach(builder::append);
+
+        return builder.build();
+    }
+
+    public static MemoryRecords transactionalRecords(
+        long producerId,
+        short producerEpoch,
+        long timestamp,
+        String... records
+    ) {
+        return transactionalRecords(
+            producerId,
+            producerEpoch,
+            timestamp,
+            Arrays.stream(records).toList()
+        );
+    }
+
+    public static MemoryRecords transactionalRecords(
+        long producerId,
+        short producerEpoch,
+        long timestamp,
+        List<String> records
+    ) {
+        if (records.isEmpty())
+            return MemoryRecords.EMPTY;
+
+        List<SimpleRecord> simpleRecords = records.stream().map(record ->
+            new SimpleRecord(timestamp, 
record.getBytes(Charset.defaultCharset()))
+        ).toList();
+
+        int sizeEstimate = AbstractRecords.estimateSizeInBytes(
+            RecordVersion.current().value,
+            CompressionType.NONE,
+            simpleRecords
+        );
+
+        ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(
+            buffer,
+            RecordVersion.current().value,
+            Compression.NONE,
+            TimestampType.CREATE_TIME,
+            0L,
+            timestamp,
+            producerId,
+            producerEpoch,
+            0,
+            true,
+            RecordBatch.NO_PARTITION_LEADER_EPOCH
+        );
+
+        simpleRecords.forEach(builder::append);
+
+        return builder.build();
+    }
+
+    public static MemoryRecords endTransactionMarker(
+        long producerId,
+        short producerEpoch,
+        long timestamp,
+        int coordinatorEpoch,
+        ControlRecordType result
+    ) {
+        return MemoryRecords.withEndTransactionMarker(
+            timestamp,
+            producerId,
+            producerEpoch,
+            new EndTransactionMarker(
+                result,
+                coordinatorEpoch
+            )
+        );
+    }
+
     public static RequestContext requestContext(
         ApiKeys apiKey
     ) {

Reply via email to