This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9992e4cfa76 MINOR: Move record helper methods out of
CoordinatorRuntimeTest (#19279)
9992e4cfa76 is described below
commit 9992e4cfa76bbad63523f57c04208237ea665420
Author: Sean Quah <[email protected]>
AuthorDate: Tue Mar 25 10:56:15 2025 +0000
MINOR: Move record helper methods out of CoordinatorRuntimeTest (#19279)
These methods are generally useful for constructing records in
coordinator tests.
Reviewers: David Jacot <[email protected]>
---
.../common/runtime/CoordinatorRuntimeTest.java | 130 +-------------------
.../kafka/coordinator/common/runtime/TestUtil.java | 132 +++++++++++++++++++++
2 files changed, 135 insertions(+), 127 deletions(-)
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index fb5b4a68572..74ebecc0989 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -18,22 +18,14 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.AbstractRecords;
-import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
-import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.RecordVersion;
-import org.apache.kafka.common.record.SimpleRecord;
-import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -52,8 +44,6 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatcher;
import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -77,6 +67,9 @@ import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.Coo
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.HighWatermarkListener.NO_OFFSET;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE;
+import static
org.apache.kafka.coordinator.common.runtime.TestUtil.endTransactionMarker;
+import static org.apache.kafka.coordinator.common.runtime.TestUtil.records;
+import static
org.apache.kafka.coordinator.common.runtime.TestUtil.transactionalRecords;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -103,123 +96,6 @@ public class CoordinatorRuntimeTest {
private static final short TXN_OFFSET_COMMIT_LATEST_VERSION =
ApiKeys.TXN_OFFSET_COMMIT.latestVersion();
- private static MemoryRecords records(
- long timestamp,
- String... records
- ) {
- return records(timestamp,
Arrays.stream(records).collect(Collectors.toList()));
- }
-
- private static MemoryRecords records(
- long timestamp,
- List<String> records
- ) {
- if (records.isEmpty())
- return MemoryRecords.EMPTY;
-
- List<SimpleRecord> simpleRecords = records.stream().map(record ->
- new SimpleRecord(timestamp,
record.getBytes(Charset.defaultCharset()))
- ).collect(Collectors.toList());
-
- int sizeEstimate = AbstractRecords.estimateSizeInBytes(
- RecordVersion.current().value,
- CompressionType.NONE,
- simpleRecords
- );
-
- ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
-
- MemoryRecordsBuilder builder = MemoryRecords.builder(
- buffer,
- RecordVersion.current().value,
- Compression.NONE,
- TimestampType.CREATE_TIME,
- 0L,
- timestamp,
- RecordBatch.NO_PRODUCER_ID,
- RecordBatch.NO_PRODUCER_EPOCH,
- 0,
- false,
- RecordBatch.NO_PARTITION_LEADER_EPOCH
- );
-
- simpleRecords.forEach(builder::append);
-
- return builder.build();
- }
-
- private static MemoryRecords transactionalRecords(
- long producerId,
- short producerEpoch,
- long timestamp,
- String... records
- ) {
- return transactionalRecords(
- producerId,
- producerEpoch,
- timestamp,
- Arrays.stream(records).collect(Collectors.toList())
- );
- }
-
- private static MemoryRecords transactionalRecords(
- long producerId,
- short producerEpoch,
- long timestamp,
- List<String> records
- ) {
- if (records.isEmpty())
- return MemoryRecords.EMPTY;
-
- List<SimpleRecord> simpleRecords = records.stream().map(record ->
- new SimpleRecord(timestamp,
record.getBytes(Charset.defaultCharset()))
- ).collect(Collectors.toList());
-
- int sizeEstimate = AbstractRecords.estimateSizeInBytes(
- RecordVersion.current().value,
- CompressionType.NONE,
- simpleRecords
- );
-
- ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
-
- MemoryRecordsBuilder builder = MemoryRecords.builder(
- buffer,
- RecordVersion.current().value,
- Compression.NONE,
- TimestampType.CREATE_TIME,
- 0L,
- timestamp,
- producerId,
- producerEpoch,
- 0,
- true,
- RecordBatch.NO_PARTITION_LEADER_EPOCH
- );
-
- simpleRecords.forEach(builder::append);
-
- return builder.build();
- }
-
- private static MemoryRecords endTransactionMarker(
- long producerId,
- short producerEpoch,
- long timestamp,
- int coordinatorEpoch,
- ControlRecordType result
- ) {
- return MemoryRecords.withEndTransactionMarker(
- timestamp,
- producerId,
- producerEpoch,
- new EndTransactionMarker(
- result,
- coordinatorEpoch
- )
- );
- }
-
@Test
public void testScheduleLoading() {
MockTimer timer = new MockTimer();
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
index 3acd3599e2d..c3eda174671 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/TestUtil.java
@@ -16,17 +16,149 @@
*/
package org.apache.kafka.coordinator.common.runtime;
+import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
public class TestUtil {
+ public static MemoryRecords records(
+ long timestamp,
+ String... records
+ ) {
+ return records(timestamp, Arrays.stream(records).toList());
+ }
+
+ public static MemoryRecords records(
+ long timestamp,
+ List<String> records
+ ) {
+ if (records.isEmpty())
+ return MemoryRecords.EMPTY;
+
+ List<SimpleRecord> simpleRecords = records.stream().map(record ->
+ new SimpleRecord(timestamp,
record.getBytes(Charset.defaultCharset()))
+ ).toList();
+
+ int sizeEstimate = AbstractRecords.estimateSizeInBytes(
+ RecordVersion.current().value,
+ CompressionType.NONE,
+ simpleRecords
+ );
+
+ ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(
+ buffer,
+ RecordVersion.current().value,
+ Compression.NONE,
+ TimestampType.CREATE_TIME,
+ 0L,
+ timestamp,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ 0,
+ false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH
+ );
+
+ simpleRecords.forEach(builder::append);
+
+ return builder.build();
+ }
+
+ public static MemoryRecords transactionalRecords(
+ long producerId,
+ short producerEpoch,
+ long timestamp,
+ String... records
+ ) {
+ return transactionalRecords(
+ producerId,
+ producerEpoch,
+ timestamp,
+ Arrays.stream(records).toList()
+ );
+ }
+
+ public static MemoryRecords transactionalRecords(
+ long producerId,
+ short producerEpoch,
+ long timestamp,
+ List<String> records
+ ) {
+ if (records.isEmpty())
+ return MemoryRecords.EMPTY;
+
+ List<SimpleRecord> simpleRecords = records.stream().map(record ->
+ new SimpleRecord(timestamp,
record.getBytes(Charset.defaultCharset()))
+ ).toList();
+
+ int sizeEstimate = AbstractRecords.estimateSizeInBytes(
+ RecordVersion.current().value,
+ CompressionType.NONE,
+ simpleRecords
+ );
+
+ ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(
+ buffer,
+ RecordVersion.current().value,
+ Compression.NONE,
+ TimestampType.CREATE_TIME,
+ 0L,
+ timestamp,
+ producerId,
+ producerEpoch,
+ 0,
+ true,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH
+ );
+
+ simpleRecords.forEach(builder::append);
+
+ return builder.build();
+ }
+
+ public static MemoryRecords endTransactionMarker(
+ long producerId,
+ short producerEpoch,
+ long timestamp,
+ int coordinatorEpoch,
+ ControlRecordType result
+ ) {
+ return MemoryRecords.withEndTransactionMarker(
+ timestamp,
+ producerId,
+ producerEpoch,
+ new EndTransactionMarker(
+ result,
+ coordinatorEpoch
+ )
+ );
+ }
+
public static RequestContext requestContext(
ApiKeys apiKey
) {