This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 87777846d68 KAFKA-20124 Move LogCleanerLagIntegrationTest to storage
module (#21406)
87777846d68 is described below
commit 87777846d6835d871791856538dfb7f77f6a9be2
Author: Maros Orsak <[email protected]>
AuthorDate: Wed Feb 25 08:26:41 2026 +0100
KAFKA-20124 Move LogCleanerLagIntegrationTest to storage module (#21406)
This PR ports `LogCleanerLagIntegrationTest` from core module into
storage. I went through each line and ported the 1:1 way as much as
possible. Moreover I have ported `AbstractLogCleanerIntegrationTest`
with additional methods, which would be needed for next ports (i.e.,
`LogCleanerParameterizedIntegrationTest`) and other integration classes.
Tests for `storage` module are passing:
```java
BUILD SUCCESSFUL in 10m 39s
88 actionable tasks: 21 executed, 67 up-to-date
```
Reviewers: Mickael Maison <[email protected]>, Chia-Ping
Tsai <[email protected]>
---
.../kafka/log/LogCleanerLagIntegrationTest.scala | 130 --------
.../log/LogCleanerLagIntegrationTest.java | 359 +++++++++++++++++++++
.../kafka/storage/internals/log/LogTestUtils.java | 42 +++
3 files changed, 401 insertions(+), 130 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
deleted file mode 100644
index c8aafe9199b..00000000000
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import kafka.utils._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.record.internal.CompressionType
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.UnifiedLog
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
-
-import scala.collection._
-import scala.jdk.CollectionConverters._
-
-/**
- * This is an integration test that tests the fully integrated log cleaner
- */
-class LogCleanerLagIntegrationTest extends AbstractLogCleanerIntegrationTest
with Logging {
- val msPerHour = 60 * 60 * 1000
-
- val minCompactionLag = 1 * msPerHour
- assertTrue(minCompactionLag % 2 == 0, "compactionLag must be divisible by 2
for this test")
-
- val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC
2014 for `currentTimeMs`
- val cleanerBackOffMs = 200L
- val segmentSize = 512
-
- val topicPartitions = Array(new TopicPartition("log", 0), new
TopicPartition("log", 1), new TopicPartition("log", 2))
-
- @ParameterizedTest
- @MethodSource(Array("parameters"))
- def cleanerTest(compressionType: CompressionType): Unit = {
- val codec: Compression = Compression.of(compressionType).build()
- cleaner = makeCleaner(partitions = topicPartitions,
- backoffMs = cleanerBackOffMs,
- minCompactionLagMs = minCompactionLag,
- segmentSize = segmentSize)
- val log = cleaner.logs.get(topicPartitions(0))
-
- // t = T0
- val T0 = time.milliseconds
- val appends0 = writeDups(numKeys = 100, numDups = 3, log, codec, timestamp
= T0)
- val startSizeBlock0 = log.size
- debug(s"total log size at T0: $startSizeBlock0")
-
- val activeSegAtT0 = log.activeSegment
- debug(s"active segment at T0 has base offset: ${activeSegAtT0.baseOffset}")
- val sizeUpToActiveSegmentAtT0 = log.logSegments(0L,
activeSegAtT0.baseOffset).asScala.map(_.size).sum
- debug(s"log size up to base offset of active segment at T0:
$sizeUpToActiveSegmentAtT0")
-
- cleaner.startup()
-
- // T0 < t < T1
- // advance to a time still less than one compaction lag from start
- time.sleep(minCompactionLag/2)
- Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to
_not_ clean
- assertEquals(startSizeBlock0, log.size, "There should be no cleaning until
the compaction lag has passed")
-
- // t = T1 > T0 + compactionLag
- // advance to time a bit more than one compaction lag from start
- time.sleep(minCompactionLag/2 + 1)
- val T1 = time.milliseconds
-
- // write another block of data
- val appends1 = appends0 ++ writeDups(numKeys = 100, numDups = 3, log,
codec, timestamp = T1)
- val firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset
-
- // the first block should get cleaned
- cleaner.awaitCleaned(new TopicPartition("log", 0),
activeSegAtT0.baseOffset, 60000L)
-
- // check the data is the same
- val read1 = readFromLog(log)
- assertEquals(appends1.toMap, read1.toMap, "Contents of the map shouldn't
change.")
-
- val compactedSize = log.logSegments(0L,
activeSegAtT0.baseOffset).asScala.map(_.size).sum
- debug(s"after cleaning the compacted size up to active segment at T0:
$compactedSize")
- val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(new
TopicPartition("log", 0))
- assertTrue(lastCleaned >= firstBlock1SegmentBaseOffset, s"log cleaner
should have processed up to offset $firstBlock1SegmentBaseOffset, but
lastCleaned=$lastCleaned")
- assertTrue(sizeUpToActiveSegmentAtT0 > compactedSize, s"log should have
been compacted: size up to offset of active segment at
T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize")
- }
-
- private def readFromLog(log: UnifiedLog): Iterable[(Int, Int)] = {
- for (segment <- log.logSegments.asScala; record <-
segment.log.records.asScala) yield {
- val key = TestUtils.readString(record.key).toInt
- val value = TestUtils.readString(record.value).toInt
- key -> value
- }
- }
-
- private def writeDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec:
Compression, timestamp: Long): Seq[(Int, Int)] = {
- for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
- val count = counter
- log.appendAsLeader(TestUtils.singletonRecords(value =
counter.toString.getBytes, codec = codec,
- key = key.toString.getBytes, timestamp = timestamp), 0)
- // move LSO forward to increase compaction bound
- log.updateHighWatermark(log.logEndOffset)
- incCounter()
- (key, count)
- }
- }
-}
-
-object LogCleanerLagIntegrationTest {
- def oneParameter: java.util.Collection[Array[String]] = {
- val l = new java.util.ArrayList[Array[String]]()
- l.add(Array("NONE"))
- l
- }
-
- def parameters: java.util.stream.Stream[Arguments] =
- java.util.Arrays.stream(CompressionType.values.map(codec =>
Arguments.of(codec)))
-}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
new file mode 100644
index 00000000000..a444bbc9e95
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.internal.CompressionType;
+import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.Record;
+import org.apache.kafka.common.record.internal.RecordBatch;
+import org.apache.kafka.common.record.internal.RecordVersion;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * This is an integration test that tests the fully integrated log cleaner
+ */
+public class LogCleanerLagIntegrationTest {
+ private static final Logger log =
LoggerFactory.getLogger(LogCleanerLagIntegrationTest.class);
+
+ protected LogCleaner cleaner;
+ protected final File logDir = TestUtils.tempDirectory();
+
+ private final List<UnifiedLog> logs = new ArrayList<>();
+ private static final int DEFAULT_MAX_MESSAGE_SIZE = 128;
+ private static final int DEFAULT_DELETE_DELAY = 1000;
+ private static final int DEFAULT_SEGMENT_SIZE = 2048;
+ private static final long DEFAULT_MIN_COMPACTION_LAG_MS = 0L;
+ private static final long DEFAULT_MAX_COMPACTION_LAG_MS = Long.MAX_VALUE;
+ private static final long MIN_COMPACTION_LAG =
Duration.ofHours(1).toMillis();
+ private static final long CLEANER_BACKOFF_MS = 200L;
+ private static final float DEFAULT_MIN_CLEANABLE_DIRTY_RATIO = 0.0F;
+ private static final int SEGMENT_SIZE = 512;
+
+ private int counter = 0;
+
+ private final MockTime time = new MockTime(1400000000000L, 1000L); // Tue
May 13 16:53:20 UTC 2014
+ private static final List<TopicPartition> TOPIC_PARTITIONS = List.of(
+ new TopicPartition("log", 0),
+ new TopicPartition("log", 1),
+ new TopicPartition("log", 2)
+ );
+
+ public record KeyValueOffset(int key, String value, long firstOffset) { }
+ public record ValueAndRecords(String value, MemoryRecords records) { }
+
+ @ParameterizedTest
+ @EnumSource(CompressionType.class)
+ public void cleanerTest(CompressionType compressionType) throws
IOException, InterruptedException {
+ Compression codec = Compression.of(compressionType).build();
+ cleaner = makeCleaner(TOPIC_PARTITIONS,
+ CLEANER_BACKOFF_MS,
+ MIN_COMPACTION_LAG,
+ SEGMENT_SIZE);
+ UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+ // t = T0
+ long t0 = time.milliseconds();
+ Map<Integer, Integer> appends0 = writeDupsWithTimestamp(100, 3,
theLog, codec, t0);
+ long startSizeBlock0 = theLog.size();
+ log.debug("total log size at T0: {}", startSizeBlock0);
+
+ LogSegment activeSegAtT0 = theLog.activeSegment();
+ log.debug("active segment at T0 has base offset: {}",
activeSegAtT0.baseOffset());
+ long sizeUpToActiveSegmentAtT0 = calculateSizeUpToOffset(theLog,
activeSegAtT0.baseOffset());
+ log.debug("log size up to base offset of active segment at T0: {}",
sizeUpToActiveSegmentAtT0);
+
+ cleaner.startup();
+
+ // T0 < t < T1
+ // advance to a time still less than one compaction lag from start
+ time.sleep(MIN_COMPACTION_LAG / 2);
+ Thread.sleep(5 * CLEANER_BACKOFF_MS); // give cleaning thread a chance
to _not_ clean
+ assertEquals(startSizeBlock0, theLog.size(), "There should be no
cleaning until the compaction lag has passed");
+
+ // t = T1 > T0 + compactionLag
+ // advance to time a bit more than one compaction lag from start
+ time.sleep(MIN_COMPACTION_LAG / 2 + 1);
+ long t1 = time.milliseconds();
+
+ // write another block of data
+ Map<Integer, Integer> appends1 = new HashMap<>(appends0);
+ appends1.putAll(writeDupsWithTimestamp(100, 3, theLog, codec, t1));
+ long firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset();
+
+ // the first block should get cleaned
+ cleaner.awaitCleaned(new TopicPartition("log", 0),
activeSegAtT0.baseOffset(), 60000L);
+
+ // check the data is the same
+ Map<Integer, Integer> read1 = readFromLog(theLog);
+ assertEquals(appends1, read1, "Contents of the map shouldn't change.");
+
+ long compactedSize = calculateSizeUpToOffset(theLog,
activeSegAtT0.baseOffset());
+ log.debug("after cleaning the compacted size up to active segment at
T0: {}", compactedSize);
+ Long lastCleaned =
cleaner.cleanerManager().allCleanerCheckpoints().get(new TopicPartition("log",
0));
+ assertTrue(lastCleaned >= firstBlock1SegmentBaseOffset,
+ String.format("log cleaner should have processed up to offset %d,
but lastCleaned=%d",
+ firstBlock1SegmentBaseOffset, lastCleaned));
+ assertTrue(sizeUpToActiveSegmentAtT0 > compactedSize,
+ String.format("log should have been compacted: size up to offset
of active segment at T0=%d compacted size=%d",
+ sizeUpToActiveSegmentAtT0, compactedSize));
+ }
+
+ private long calculateSizeUpToOffset(UnifiedLog log, long offset) {
+ long size = 0;
+ for (LogSegment segment : log.logSegments(0L, offset)) {
+ size += segment.size();
+ }
+ return size;
+ }
+
+ private Map<Integer, Integer> readFromLog(UnifiedLog log) {
+ Map<Integer, Integer> result = new HashMap<>();
+ for (LogSegment segment : log.logSegments()) {
+ for (Record record : segment.log().records()) {
+ int key =
Integer.parseInt(LogTestUtils.readString(record.key()));
+ int value =
Integer.parseInt(LogTestUtils.readString(record.value()));
+ result.put(key, value);
+ }
+ }
+ return result;
+ }
+
+ private Map<Integer, Integer> writeDupsWithTimestamp(int numKeys, int
numDups, UnifiedLog log,
+ Compression codec,
long timestamp) throws IOException {
+ Map<Integer, Integer> result = new HashMap<>();
+ for (int i = 0; i < numDups; i++) {
+ for (int key = 0; key < numKeys; key++) {
+ int count = counter();
+ log.appendAsLeader(
+ LogTestUtils.singletonRecords(
+ String.valueOf(count).getBytes(),
+ codec,
+ String.valueOf(key).getBytes(),
+ timestamp),
+ 0);
+ // move LSO forward to increase compaction bound
+ log.updateHighWatermark(log.logEndOffset());
+ incCounter();
+ result.put(key, count);
+ }
+ }
+ return result;
+ }
+
+ private Properties logConfigProperties(Properties propertyOverrides,
+ int maxMessageSize,
+ float minCleanableDirtyRatio,
+ long minCompactionLagMs,
+ int deleteDelay,
+ int segmentSize,
+ long maxCompactionLagMs) {
+ Properties props = new Properties();
+ props.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageSize);
+ props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize);
+ props.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 100 * 1024);
+ props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, deleteDelay);
+ props.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT);
+ props.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG,
minCleanableDirtyRatio);
+ props.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG,
minCompactionLagMs);
+ props.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG,
maxCompactionLagMs);
+ props.putAll(propertyOverrides);
+ return props;
+ }
+
+ private Properties logConfigProperties(int maxMessageSize) {
+ return logConfigProperties(new Properties(), maxMessageSize,
+ DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, DEFAULT_MIN_COMPACTION_LAG_MS,
+ DEFAULT_DELETE_DELAY, DEFAULT_SEGMENT_SIZE,
DEFAULT_MAX_COMPACTION_LAG_MS);
+ }
+
+ private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+ float minCleanableDirtyRatio,
+ int numThreads,
+ long backoffMs,
+ int maxMessageSize,
+ long minCompactionLagMs,
+ int deleteDelay,
+ int segmentSize,
+ long maxCompactionLagMs,
+ Integer cleanerIoBufferSize,
+ Properties propertyOverrides) throws
IOException {
+
+ ConcurrentMap<TopicPartition, UnifiedLog> logMap = new
ConcurrentHashMap<>();
+ for (TopicPartition partition : partitions) {
+ File dir = new File(logDir, partition.topic() + "-" +
partition.partition());
+ Files.createDirectories(dir.toPath());
+
+ Properties props = logConfigProperties(propertyOverrides,
+ maxMessageSize,
+ minCleanableDirtyRatio,
+ minCompactionLagMs,
+ deleteDelay,
+ segmentSize,
+ maxCompactionLagMs);
+ LogConfig logConfig = new LogConfig(props);
+
+ UnifiedLog log = UnifiedLog.create(
+ dir,
+ logConfig,
+ 0L,
+ 0L,
+ time.scheduler,
+ new BrokerTopicStats(),
+ time,
+ 5 * 60 * 1000,
+ new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ new LogDirFailureChannel(10),
+ true,
+ Optional.empty());
+ logMap.put(partition, log);
+ logs.add(log);
+ }
+
+ int ioBufferSize = cleanerIoBufferSize != null ? cleanerIoBufferSize :
maxMessageSize / 2;
+ CleanerConfig cleanerConfig = new CleanerConfig(
+ numThreads,
+ 4 * 1024 * 1024L,
+ 0.9,
+ ioBufferSize,
+ maxMessageSize,
+ Double.MAX_VALUE,
+ backoffMs,
+ true);
+
+ return new LogCleaner(cleanerConfig,
+ List.of(logDir),
+ logMap,
+ new LogDirFailureChannel(1),
+ time);
+ }
+
+ private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+ long backoffMs,
+ long minCompactionLagMs,
+ int segmentSize) throws IOException {
+ return makeCleaner(partitions,
+ DEFAULT_MIN_CLEANABLE_DIRTY_RATIO,
+ 1,
+ backoffMs,
+ DEFAULT_MAX_MESSAGE_SIZE,
+ minCompactionLagMs,
+ DEFAULT_DELETE_DELAY,
+ segmentSize,
+ DEFAULT_MAX_COMPACTION_LAG_MS,
+ null,
+ new Properties());
+ }
+
+ private int counter() {
+ return counter;
+ }
+
+ private void incCounter() {
+ counter++;
+ }
+
+ private List<KeyValueOffset> writeDups(int numKeys, int numDups,
UnifiedLog log, Compression codec,
+ int startKey, byte magicValue)
throws IOException {
+ List<KeyValueOffset> results = new ArrayList<>();
+ for (int i = 0; i < numDups; i++) {
+ for (int key = startKey; key < startKey + numKeys; key++) {
+ String value = String.valueOf(counter());
+ MemoryRecords records = LogTestUtils.singletonRecords(
+ value.getBytes(),
+ codec,
+ String.valueOf(key).getBytes(),
+ RecordBatch.NO_TIMESTAMP,
+ magicValue);
+ LogAppendInfo appendInfo = log.appendAsLeaderWithRecordVersion(
+ records, 0, RecordVersion.lookup(magicValue));
+ // move LSO forward to increase compaction bound
+ log.updateHighWatermark(log.logEndOffset());
+ results.add(new KeyValueOffset(key, value,
appendInfo.firstOffset()));
+ incCounter();
+ }
+ }
+ return results;
+ }
+
+ private List<KeyValueOffset> writeDups(int numKeys, int numDups,
UnifiedLog log, Compression codec) throws IOException {
+ return writeDups(numKeys, numDups, log, codec, 0,
RecordBatch.CURRENT_MAGIC_VALUE);
+ }
+
+ private ValueAndRecords createLargeSingleMessageSet(int key, byte
messageFormatVersion, Compression codec) {
+ Random random = new Random(0);
+ StringBuilder sb = new StringBuilder(128);
+ for (int i = 0; i < 128; i++) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ String value = sb.toString();
+ MemoryRecords records = LogTestUtils.singletonRecords(
+ value.getBytes(),
+ codec,
+ String.valueOf(key).getBytes(),
+ RecordBatch.NO_TIMESTAMP,
+ messageFormatVersion);
+ return new ValueAndRecords(value, records);
+ }
+
+ private void closeLog(UnifiedLog log) throws IOException {
+ log.close();
+ logs.remove(log);
+ }
+
+ @AfterEach
+ public void teardown() throws IOException, InterruptedException {
+ if (cleaner != null) {
+ cleaner.shutdown();
+ }
+ time.scheduler.shutdown();
+ for (UnifiedLog log : logs) {
+ log.close();
+ }
+ Utils.delete(logDir);
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index cd1f2458c8d..974913b91be 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -97,6 +97,48 @@ public class LogTestUtils {
);
}
+ /**
+ * Create a single record batch with the specified compression and
timestamp.
+ */
+ public static MemoryRecords singletonRecords(byte[] value, Compression
codec, byte[] key, long timestamp) {
+ return records(
+ List.of(new SimpleRecord(timestamp, key, value)),
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ codec,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ 0L,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH
+ );
+ }
+
+ /**
+ * Create a single record batch with the specified compression, timestamp,
and magic value.
+ */
+ public static MemoryRecords singletonRecords(byte[] value, Compression
codec, byte[] key,
+ long timestamp, byte
magicValue) {
+ return records(
+ List.of(new SimpleRecord(timestamp, key, value)),
+ magicValue,
+ codec,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ 0L,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH
+ );
+ }
+
+ /**
+ * Read a string from a ByteBuffer using the default charset.
+ */
+ public static String readString(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return new String(bytes);
+ }
+
public static MemoryRecords records(List<SimpleRecord> records,
byte magicValue,
Compression codec,