This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4a5562c3418 KAFKA-19306 Migrate LogCompactionTester to tools module
(#19905)
4a5562c3418 is described below
commit 4a5562c3418846b2c63999d98f79daad5d29783b
Author: Yunchi Pang <[email protected]>
AuthorDate: Sun Aug 17 11:49:06 2025 -0700
KAFKA-19306 Migrate LogCompactionTester to tools module (#19905)
jira: [KAFKA-19306](https://issues.apache.org/jira/browse/KAFKA-19306)
log
```
Producing 1000000 messages..to topics
log-cleaner-test-849894102467800668-0
Logging produce requests to
/tmp/kafka-log-cleaner-produced-6049271649847384547.txt
Sleeping for 20seconds...
Consuming messages...
Logging consumed messages to
/tmp/kafka-log-cleaner-consumed-7065252868189829937.txt
1000000 rows of data produced, 120176 rows of data consumed (88.0%
reduction).
De-duplicating and validating output files...
Validated 90057 values, 0 mismatches.
Data verification is completed
```
result
```
================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.12.0
session_id: 2025-07-10--001
run time: 1 minute 2.051 seconds
tests run: 1
passed: 1
flaky: 0
failed: 0
ignored: 0
================================================================================
test_id:
kafkatest.tests.tools.log_compaction_test.LogCompactionTest.test_log_compaction.metadata_quorum=ISOLATED_KRAFT
status: PASS
run time: 1 minute 1.809 seconds
```
Reviewers: Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../scala/kafka/tools/LogCompactionTester.scala | 349 ---------------
tests/kafkatest/services/log_compaction_tester.py | 2 +-
.../apache/kafka/tools/LogCompactionTester.java | 492 +++++++++++++++++++++
3 files changed, 493 insertions(+), 350 deletions(-)
diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala
b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
deleted file mode 100755
index 90be819da4d..00000000000
--- a/core/src/test/scala/kafka/tools/LogCompactionTester.scala
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.io._
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets.UTF_8
-import java.nio.file.{Files, Path}
-import java.time.Duration
-import java.util.{Properties, Random}
-
-import joptsimple.OptionParser
-import kafka.utils._
-import org.apache.kafka.clients.admin.{Admin, NewTopic}
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
KafkaConsumer}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.serialization.{ByteArraySerializer,
StringDeserializer}
-import org.apache.kafka.common.utils.{Exit, AbstractIterator, Utils}
-import org.apache.kafka.server.util.CommandLineUtils
-
-import scala.jdk.CollectionConverters._
-
-/**
- * This is a torture test that runs against an existing broker
- *
- * Here is how it works:
- *
- * It produces a series of specially formatted messages to one or more
partitions. Each message it produces
- * it logs out to a text file. The messages have a limited set of keys, so
there is duplication in the key space.
- *
- * The broker will clean its log as the test runs.
- *
- * When the specified number of messages have been produced we create a
consumer and consume all the messages in the topic
- * and write that out to another text file.
- *
- * Using a stable unix sort we sort both the producer log of what was sent and
the consumer log of what was retrieved by the message key.
- * Then we compare the final message in both logs for each key. If this final
message is not the same for all keys we
- * print an error and exit with exit code 1, otherwise we print the size
reduction and exit with exit code 0.
- */
-object LogCompactionTester {
-
- //maximum line size while reading produced/consumed record text file
- private val ReadAheadLimit = 4906
-
- def main(args: Array[String]): Unit = {
- val parser = new OptionParser(false)
- val numMessagesOpt = parser.accepts("messages", "The number of messages to
send or consume.")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Long])
- .defaultsTo(Long.MaxValue)
- val messageCompressionOpt = parser.accepts("compression-type", "message
compression type")
- .withOptionalArg
- .describedAs("compressionType")
- .ofType(classOf[java.lang.String])
- .defaultsTo("none")
- val numDupsOpt = parser.accepts("duplicates", "The number of duplicates
for each key.")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(5)
- val brokerOpt = parser.accepts("bootstrap-server", "The server(s) to
connect to.")
- .withRequiredArg
- .describedAs("url")
- .ofType(classOf[String])
- val topicsOpt = parser.accepts("topics", "The number of topics to test.")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
- val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage
of updates that are deletes.")
- .withRequiredArg
- .describedAs("percent")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(0)
- val sleepSecsOpt = parser.accepts("sleep", "Time in milliseconds to sleep
between production and consumption.")
- .withRequiredArg
- .describedAs("ms")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(0)
-
- val options = parser.parse(args: _*)
-
- if (args.isEmpty)
- CommandLineUtils.printUsageAndExit(parser, "A tool to test log
compaction. Valid options are: ")
-
- CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt,
numMessagesOpt)
-
- // parse options
- val messages = options.valueOf(numMessagesOpt).longValue
- val compressionType = options.valueOf(messageCompressionOpt)
- val percentDeletes = options.valueOf(percentDeletesOpt).intValue
- val dups = options.valueOf(numDupsOpt).intValue
- val brokerUrl = options.valueOf(brokerOpt)
- val topicCount = options.valueOf(topicsOpt).intValue
- val sleepSecs = options.valueOf(sleepSecsOpt).intValue
-
- val testId = new Random().nextLong
- val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" +
_).toArray
- createTopics(brokerUrl, topics.toSeq)
-
- println(s"Producing $messages messages..to topics ${topics.mkString(",")}")
- val producedDataFilePath = produceMessages(brokerUrl, topics, messages,
compressionType, dups, percentDeletes)
- println(s"Sleeping for $sleepSecs seconds...")
- Thread.sleep(sleepSecs * 1000)
- println("Consuming messages...")
- val consumedDataFilePath = consumeMessages(brokerUrl, topics)
-
- val producedLines = lineCount(producedDataFilePath)
- val consumedLines = lineCount(consumedDataFilePath)
- val reduction = 100 * (1.0 - consumedLines.toDouble /
producedLines.toDouble)
- println(f"$producedLines%d rows of data produced, $consumedLines%d rows of
data consumed ($reduction%.1f%% reduction).")
-
- println("De-duplicating and validating output files...")
- validateOutput(producedDataFilePath.toFile, consumedDataFilePath.toFile)
- Utils.delete(producedDataFilePath.toFile)
- Utils.delete(consumedDataFilePath.toFile)
- //if you change this line, we need to update test_log_compaction_tool.py
system test
- println("Data verification is completed")
- }
-
- def createTopics(brokerUrl: String, topics: Seq[String]): Unit = {
- val adminConfig = new Properties
- adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
- val adminClient = Admin.create(adminConfig)
-
- try {
- val topicConfigs = java.util.Map.of(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
- val newTopics = topics.map(name => new NewTopic(name, 1,
1.toShort).configs(topicConfigs)).asJava
- adminClient.createTopics(newTopics).all.get
-
- var pendingTopics: Seq[String] = Seq()
- TestUtils.waitUntilTrue(() => {
- val allTopics = adminClient.listTopics.names.get.asScala.toSeq
- pendingTopics = topics.filter(topicName =>
!allTopics.contains(topicName))
- pendingTopics.isEmpty
- }, s"timed out waiting for topics : $pendingTopics")
-
- } finally adminClient.close()
- }
-
- def lineCount(filPath: Path): Int = Files.readAllLines(filPath).size
-
- def validateOutput(producedDataFile: File, consumedDataFile: File): Unit = {
- val producedReader = externalSort(producedDataFile)
- val consumedReader = externalSort(consumedDataFile)
- val produced = valuesIterator(producedReader)
- val consumed = valuesIterator(consumedReader)
-
- val producedDedupedFile = new File(producedDataFile.getAbsolutePath +
".deduped")
- val producedDeduped : BufferedWriter =
Files.newBufferedWriter(producedDedupedFile.toPath, UTF_8)
-
- val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath +
".deduped")
- val consumedDeduped : BufferedWriter =
Files.newBufferedWriter(consumedDedupedFile.toPath, UTF_8)
- var total = 0
- var mismatched = 0
- while (produced.hasNext && consumed.hasNext) {
- val p = produced.next()
- producedDeduped.write(p.toString)
- producedDeduped.newLine()
- val c = consumed.next()
- consumedDeduped.write(c.toString)
- consumedDeduped.newLine()
- if (p != c)
- mismatched += 1
- total += 1
- }
- producedDeduped.close()
- consumedDeduped.close()
- println(s"Validated $total values, $mismatched mismatches.")
- require(!produced.hasNext, "Additional values produced not found in
consumer log.")
- require(!consumed.hasNext, "Additional values consumed not found in
producer log.")
- require(mismatched == 0, "Non-zero number of row mismatches.")
- // if all the checks worked out we can delete the deduped files
- Utils.delete(producedDedupedFile)
- Utils.delete(consumedDedupedFile)
- }
-
- def require(requirement: Boolean, message: => Any): Unit = {
- if (!requirement) {
- System.err.println(s"Data validation failed : $message")
- Exit.exit(1)
- }
- }
-
- def valuesIterator(reader: BufferedReader): Iterator[TestRecord] = {
- new AbstractIterator[TestRecord] {
- def makeNext(): TestRecord = {
- var next = readNext(reader)
- while (next != null && next.delete)
- next = readNext(reader)
- if (next == null)
- allDone()
- else
- next
- }
- }.asScala
- }
-
- def readNext(reader: BufferedReader): TestRecord = {
- var line = reader.readLine()
- if (line == null)
- return null
- var curr = TestRecord.parse(line)
- while (true) {
- line = peekLine(reader)
- if (line == null)
- return curr
- val next = TestRecord.parse(line)
- if (next == null || next.topicAndKey != curr.topicAndKey)
- return curr
- curr = next
- reader.readLine()
- }
- null
- }
-
- def peekLine(reader: BufferedReader) = {
- reader.mark(ReadAheadLimit)
- val line = reader.readLine
- reader.reset()
- line
- }
-
- def externalSort(file: File): BufferedReader = {
- val builder = new ProcessBuilder("sort", "--key=1,2", "--stable",
"--buffer-size=20%", "--temporary-directory=" +
Files.createTempDirectory("log_compaction_test"), file.getAbsolutePath)
- val process = builder.start
- new Thread() {
- override def run(): Unit = {
- val exitCode = process.waitFor()
- if (exitCode != 0) {
- System.err.println("Process exited abnormally.")
- while (process.getErrorStream.available > 0) {
- System.err.write(process.getErrorStream.read())
- }
- }
- }
- }.start()
- new BufferedReader(new InputStreamReader(process.getInputStream, UTF_8),
10 * 1024 * 1024)
- }
-
- def produceMessages(brokerUrl: String,
- topics: Array[String],
- messages: Long,
- compressionType: String,
- dups: Int,
- percentDeletes: Int): Path = {
- val producerProps = new Properties
- producerProps.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG,
Long.MaxValue.toString)
- producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerUrl)
- producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG,
compressionType)
- val producer = new KafkaProducer(producerProps, new ByteArraySerializer,
new ByteArraySerializer)
- try {
- val rand = new Random(1)
- val keyCount = (messages / dups).toInt
- val producedFilePath =
Files.createTempFile("kafka-log-cleaner-produced-", ".txt")
- println(s"Logging produce requests to $producedFilePath")
- val producedWriter: BufferedWriter =
Files.newBufferedWriter(producedFilePath, UTF_8)
- for (i <- 0L until (messages * topics.length)) {
- val topic = topics((i % topics.length).toInt)
- val key = rand.nextInt(keyCount)
- val delete = (i % 100) < percentDeletes
- val msg =
- if (delete)
- new ProducerRecord[Array[Byte], Array[Byte]](topic,
key.toString.getBytes(UTF_8), null)
- else
- new ProducerRecord(topic, key.toString.getBytes(UTF_8),
i.toString.getBytes(UTF_8))
- producer.send(msg)
- producedWriter.write(TestRecord(topic, key, i, delete).toString)
- producedWriter.newLine()
- }
- producedWriter.close()
- producedFilePath
- } finally {
- producer.close()
- }
- }
-
- def createConsumer(brokerUrl: String): Consumer[String, String] = {
- val consumerProps = new Properties
- consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
- consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerUrl)
- consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest")
- new KafkaConsumer(consumerProps, new StringDeserializer, new
StringDeserializer)
- }
-
- def consumeMessages(brokerUrl: String, topics: Array[String]): Path = {
- val consumer = createConsumer(brokerUrl)
- consumer.subscribe(topics.toSeq.asJava)
- val consumedFilePath = Files.createTempFile("kafka-log-cleaner-consumed-",
".txt")
- println(s"Logging consumed messages to $consumedFilePath")
- val consumedWriter: BufferedWriter =
Files.newBufferedWriter(consumedFilePath, UTF_8)
-
- try {
- var done = false
- while (!done) {
- val consumerRecords = consumer.poll(Duration.ofSeconds(20))
- if (!consumerRecords.isEmpty) {
- for (record <- consumerRecords.asScala) {
- val delete = record.value == null
- val value = if (delete) -1L else record.value.toLong
- consumedWriter.write(TestRecord(record.topic, record.key.toInt,
value, delete).toString)
- consumedWriter.newLine()
- }
- } else {
- done = true
- }
- }
- consumedFilePath
- } finally {
- consumedWriter.close()
- consumer.close()
- }
- }
-
- def readString(buffer: ByteBuffer): String = {
- Utils.utf8(buffer)
- }
-
-}
-
-case class TestRecord(topic: String, key: Int, value: Long, delete: Boolean) {
- override def toString = topic + "\t" + key + "\t" + value + "\t" + (if
(delete) "d" else "u")
- def topicAndKey = topic + key
-}
-
-object TestRecord {
- def parse(line: String): TestRecord = {
- val components = line.split("\t")
- new TestRecord(components(0), components(1).toInt, components(2).toLong,
components(3) == "d")
- }
-}
diff --git a/tests/kafkatest/services/log_compaction_tester.py
b/tests/kafkatest/services/log_compaction_tester.py
index cc6bf4fc296..332edd2f769 100644
--- a/tests/kafkatest/services/log_compaction_tester.py
+++ b/tests/kafkatest/services/log_compaction_tester.py
@@ -82,7 +82,7 @@ class LogCompactionTester(KafkaPathResolverMixin,
BackgroundThreadService):
node.account.ssh("rm -rf %s" % LogCompactionTester.OUTPUT_DIR,
allow_fail=False)
def java_class_name(self):
- return "kafka.tools.LogCompactionTester"
+ return "org.apache.kafka.tools.LogCompactionTester"
@property
def is_done(self):
diff --git
a/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java
b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java
new file mode 100644
index 00000000000..accf5241a35
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
+
+/**
+ * This is a torture test that runs against an existing broker
+ * <p>
+ * Here is how it works:
+ * <p>
+ * It produces a series of specially formatted messages to one or more
partitions. Each message it produces
+ * it logs out to a text file. The messages have a limited set of keys, so
there is duplication in the key space.
+ * <p>
+ * The broker will clean its log as the test runs.
+ * <p>
+ * When the specified number of messages has been produced we create a
consumer and consume all the messages in the topic
+ * and write that out to another text file.
+ * <p>
+ * Using a stable unix sort we sort both the producer log of what was sent and
the consumer log of what was retrieved by the message key.
+ * Then we compare the final message in both logs for each key. If this final
message is not the same for all keys we
+ * print an error and exit with exit code 1, otherwise we print the size
reduction and exit with exit code 0.
+ */
+public class LogCompactionTester {
+
+ public static class Options {
+ public final OptionSpec<Long> numMessagesOpt;
+ public final OptionSpec<String> messageCompressionOpt;
+ public final OptionSpec<Integer> numDupsOpt;
+ public final OptionSpec<String> brokerOpt;
+ public final OptionSpec<Integer> topicsOpt;
+ public final OptionSpec<Integer> percentDeletesOpt;
+ public final OptionSpec<Integer> sleepSecsOpt;
+ public final OptionSpec<Void> helpOpt;
+
+ public Options(OptionParser parser) {
+ numMessagesOpt = parser
+ .accepts("messages", "The number of messages to send or
consume.")
+ .withRequiredArg()
+ .describedAs("count")
+ .ofType(Long.class)
+ .defaultsTo(Long.MAX_VALUE);
+
+ messageCompressionOpt = parser
+ .accepts("compression-type", "message compression type")
+ .withOptionalArg()
+ .describedAs("compressionType")
+ .ofType(String.class)
+ .defaultsTo("none");
+
+ numDupsOpt = parser
+ .accepts("duplicates", "The number of duplicates for each
key.")
+ .withRequiredArg()
+ .describedAs("count")
+ .ofType(Integer.class)
+ .defaultsTo(5);
+
+ brokerOpt = parser
+ .accepts("bootstrap-server", "The server(s) to connect
to.")
+ .withRequiredArg()
+ .describedAs("url")
+ .ofType(String.class);
+
+ topicsOpt = parser
+ .accepts("topics", "The number of topics to test.")
+ .withRequiredArg()
+ .describedAs("count")
+ .ofType(Integer.class)
+ .defaultsTo(1);
+
+ percentDeletesOpt = parser
+ .accepts("percent-deletes", "The percentage of updates
that are deletes.")
+ .withRequiredArg()
+ .describedAs("percent")
+ .ofType(Integer.class)
+ .defaultsTo(0);
+
+ sleepSecsOpt = parser
+ .accepts("sleep", "Time in milliseconds to sleep between
production and consumption.")
+ .withRequiredArg()
+ .describedAs("ms")
+ .ofType(Integer.class)
+ .defaultsTo(0);
+
+ helpOpt = parser
+ .acceptsAll(List.of("h", "help"), "Display help
information");
+ }
+ }
+
+ public record TestRecord(String topic, int key, long value, boolean
delete) {
+ @Override
+ public String toString() {
+ return topic + "\t" + key + "\t" + value + "\t" + (delete ? "d" :
"u");
+ }
+
+ public String getTopicAndKey() {
+ return topic + key;
+ }
+
+ public static TestRecord parse(String line) {
+ String[] components = line.split("\t");
+ if (components.length != 4) {
+ throw new IllegalArgumentException("Invalid TestRecord format:
" + line);
+ }
+
+ return new TestRecord(
+ components[0],
+ Integer.parseInt(components[1]),
+ Long.parseLong(components[2]),
+ "d".equals(components[3])
+ );
+ }
+ }
+
+ public static class TestRecordUtils {
+ // maximum line size while reading produced/consumed record text file
+ private static final int READ_AHEAD_LIMIT = 4906;
+
+ public static TestRecord readNext(BufferedReader reader) throws
IOException {
+ String line = reader.readLine();
+ if (line == null) {
+ return null;
+ }
+ TestRecord curr = TestRecord.parse(line);
+ while (true) {
+ String peekedLine = peekLine(reader);
+ if (peekedLine == null) {
+ return curr;
+ }
+ TestRecord next = TestRecord.parse(peekedLine);
+ if (!next.getTopicAndKey().equals(curr.getTopicAndKey())) {
+ return curr;
+ }
+ curr = next;
+ reader.readLine();
+ }
+ }
+
+ public static Iterator<TestRecord> valuesIterator(BufferedReader
reader) {
+ return Spliterators.iterator(new
Spliterators.AbstractSpliterator<>(
+ Long.MAX_VALUE, Spliterator.ORDERED) {
+ @Override
+ public boolean tryAdvance(java.util.function.Consumer<? super
TestRecord> action) {
+ try {
+ TestRecord rec;
+ do {
+ rec = readNext(reader);
+ } while (rec != null && rec.delete);
+ if (rec == null) return false;
+ action.accept(rec);
+ return true;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ });
+ }
+
+ public static String peekLine(BufferedReader reader) throws
IOException {
+ reader.mark(READ_AHEAD_LIMIT);
+ String line = reader.readLine();
+ reader.reset();
+ return line;
+ }
+ }
+
+ private static final Random RANDOM = new Random();
+
+ public static void main(String[] args) throws Exception {
+
+ OptionParser parser = new OptionParser(false);
+ Options options = new Options(parser);
+
+ OptionSet optionSet = parser.parse(args);
+ if (args.length == 0) {
+ CommandLineUtils.printUsageAndExit(parser,
+ "A tool to test log compaction. Valid options are: ");
+ }
+
+ CommandLineUtils.checkRequiredArgs(parser, optionSet,
options.brokerOpt, options.numMessagesOpt);
+
+ long messages = optionSet.valueOf(options.numMessagesOpt);
+ String compressionType =
optionSet.valueOf(options.messageCompressionOpt);
+ int percentDeletes = optionSet.valueOf(options.percentDeletesOpt);
+ int dups = optionSet.valueOf(options.numDupsOpt);
+ String brokerUrl = optionSet.valueOf(options.brokerOpt);
+ int topicCount = optionSet.valueOf(options.topicsOpt);
+ int sleepSecs = optionSet.valueOf(options.sleepSecsOpt);
+
+ long testId = RANDOM.nextLong();
+ String[] topics = IntStream.range(0, topicCount)
+ .mapToObj(i -> "log-cleaner-test-" + testId + "-" + i)
+ .toArray(String[]::new);
+ createTopics(brokerUrl, topics);
+
+ System.out.println("Producing " + messages + " messages..to topics " +
String.join(",", topics));
+ Path producedDataFilePath = produceMessages(
+ brokerUrl, topics, messages,
+ compressionType, dups, percentDeletes);
+ System.out.println("Sleeping for " + sleepSecs + "seconds...");
+ TimeUnit.MILLISECONDS.sleep(sleepSecs * 1000L);
+ System.out.println("Consuming messages...");
+ Path consumedDataFilePath = consumeMessages(brokerUrl, topics);
+
+ long producedLines = lineCount(producedDataFilePath);
+ long consumedLines = lineCount(consumedDataFilePath);
+ double reduction = 100 * (1.0 - (double) consumedLines /
producedLines);
+
+ System.out.printf(
+ "%d rows of data produced, %d rows of data consumed (%.1f%%
reduction).%n",
+ producedLines, consumedLines, reduction);
+
+ System.out.println("De-duplicating and validating output files...");
+ validateOutput(producedDataFilePath.toFile(),
consumedDataFilePath.toFile());
+
+ Files.deleteIfExists(producedDataFilePath);
+ Files.deleteIfExists(consumedDataFilePath);
+ // if you change this line, we need to update
test_log_compaction_tool.py system test
+ System.out.println("Data verification is completed");
+ }
+
+
+ private static void createTopics(String brokerUrl, String[] topics) throws
Exception {
+ Properties adminConfig = new Properties();
+ adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
brokerUrl);
+
+ try (Admin adminClient = Admin.create(adminConfig)) {
+ Map<String, String> topicConfigs = Map.of(
+ TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT
+ );
+ List<NewTopic> newTopics = Arrays.stream(topics)
+ .map(name -> new NewTopic(name, 1, (short)
1).configs(topicConfigs)).toList();
+ adminClient.createTopics(newTopics).all().get();
+
+ final List<String> pendingTopics = new ArrayList<>();
+ waitUntilTrue(() -> {
+ try {
+ Set<String> allTopics =
adminClient.listTopics().names().get();
+ pendingTopics.clear();
+ pendingTopics.addAll(
+ Arrays.stream(topics)
+ .filter(topicName ->
!allTopics.contains(topicName))
+ .toList()
+ );
+ return pendingTopics.isEmpty();
+ } catch (InterruptedException |
java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }, () -> "timed out waiting for topics: " + pendingTopics);
+ }
+ }
+
+ private static void validateOutput(File producedDataFile, File
consumedDataFile) {
+ try (BufferedReader producedReader = externalSort(producedDataFile);
+ BufferedReader consumedReader = externalSort(consumedDataFile)) {
+ Iterator<TestRecord> produced =
TestRecordUtils.valuesIterator(producedReader);
+ Iterator<TestRecord> consumed =
TestRecordUtils.valuesIterator(consumedReader);
+
+ File producedDedupedFile = new
File(producedDataFile.getAbsolutePath() + ".deduped");
+ File consumedDedupedFile = new
File(consumedDataFile.getAbsolutePath() + ".deduped");
+
+ try (BufferedWriter producedDeduped = Files.newBufferedWriter(
+ producedDedupedFile.toPath(), StandardCharsets.UTF_8);
+ BufferedWriter consumedDeduped = Files.newBufferedWriter(
+ consumedDedupedFile.toPath(),
StandardCharsets.UTF_8)) {
+ int total = 0;
+ int mismatched = 0;
+ while (produced.hasNext() && consumed.hasNext()) {
+ TestRecord p = produced.next();
+ producedDeduped.write(p.toString());
+ producedDeduped.newLine();
+
+ TestRecord c = consumed.next();
+ consumedDeduped.write(c.toString());
+ consumedDeduped.newLine();
+
+ if (!p.equals(c)) {
+ mismatched++;
+ }
+ total++;
+ }
+
+ System.out.printf("Validated %d values, %d mismatches.%n",
total, mismatched);
+ require(!produced.hasNext(), "Additional values produced not
found in consumer log.");
+ require(!consumed.hasNext(), "Additional values consumed not
found in producer log.");
+ require(mismatched == 0, "Non-zero number of row mismatches.");
+ // if all the checks worked out we can delete the deduped files
+ Files.deleteIfExists(producedDedupedFile.toPath());
+ Files.deleteIfExists(consumedDedupedFile.toPath());
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static BufferedReader externalSort(File file) throws IOException {
+ Path tempDir = Files.createTempDirectory("log_compaction_test");
+
+ ProcessBuilder builder = new ProcessBuilder(
+ "sort", "--key=1,2", "--stable", "--buffer-size=20%",
+ "--temporary-directory=" + tempDir.toString(),
file.getAbsolutePath());
+ builder.redirectError(ProcessBuilder.Redirect.INHERIT);
+
+ Process process;
+ try {
+ process = builder.start();
+ } catch (IOException e) {
+ // clean up temp directory if process fails to start
+ try {
+ Files.deleteIfExists(tempDir);
+ } catch (IOException cleanupException) {
+ e.addSuppressed(cleanupException);
+ }
+ throw new IOException("Failed to start sort process. Ensure 'sort'
command is available.", e);
+ }
+
+ return new BufferedReader(
+ new InputStreamReader(process.getInputStream(),
StandardCharsets.UTF_8),
+ 10 * 1024 * 1024
+ );
+ }
+
+ private static long lineCount(Path filePath) throws IOException {
+ try (Stream<String> lines = Files.lines(filePath)) {
+ return lines.count();
+ }
+ }
+
+ private static void require(boolean requirement, String message) {
+ if (!requirement) {
+ System.err.println("Data validation failed : " + message);
+ Exit.exit(1);
+ }
+ }
+
+ private static Path produceMessages(String brokerUrl, String[] topics,
long messages,
+ String compressionType, int dups, int
percentDeletes) throws IOException {
+ Map<String, Object> producerProps = Map.of(
+ ProducerConfig.MAX_BLOCK_MS_CONFIG,
String.valueOf(Long.MAX_VALUE),
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl,
+ ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType
+ );
+
+ try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
+ producerProps, new ByteArraySerializer(), new
ByteArraySerializer())) {
+ int keyCount = (int) (messages / dups);
+ Path producedFilePath =
Files.createTempFile("kafka-log-cleaner-produced-", ".txt");
+ System.out.println("Logging produce requests to " +
producedFilePath);
+
+ try (BufferedWriter producedWriter = Files.newBufferedWriter(
+ producedFilePath, StandardCharsets.UTF_8)) {
+ for (long i = 0; i < messages * topics.length; i++) {
+ String topic = topics[(int) (i % topics.length)];
+ int key = RANDOM.nextInt(keyCount);
+ boolean delete = (i % 100) < percentDeletes;
+ ProducerRecord<byte[], byte[]> record;
+ if (delete) {
+ record = new ProducerRecord<>(topic,
+
String.valueOf(key).getBytes(StandardCharsets.UTF_8), null);
+ } else {
+ record = new ProducerRecord<>(topic,
+
String.valueOf(key).getBytes(StandardCharsets.UTF_8),
+
String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+ }
+ producer.send(record);
+ producedWriter.write(new TestRecord(topic, key, i,
delete).toString());
+ producedWriter.newLine();
+ }
+ }
+ return producedFilePath;
+ }
+ }
+
+ private static Path consumeMessages(String brokerUrl, String[] topics)
throws IOException {
+
+ Path consumedFilePath =
Files.createTempFile("kafka-log-cleaner-consumed-", ".txt");
+ System.out.println("Logging consumed messages to " + consumedFilePath);
+
+ try (Consumer<String, String> consumer = createConsumer(brokerUrl);
+ BufferedWriter consumedWriter =
Files.newBufferedWriter(consumedFilePath, StandardCharsets.UTF_8)) {
+ consumer.subscribe(Arrays.asList(topics));
+ while (true) {
+ ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofSeconds(20));
+ if (consumerRecords.isEmpty()) return consumedFilePath;
+ consumerRecords.forEach(
+ record -> {
+ try {
+ boolean delete = record.value() == null;
+ long value = delete ? -1L :
Long.parseLong(record.value());
+ TestRecord testRecord = new TestRecord(
+ record.topic(),
Integer.parseInt(record.key()), value, delete);
+ consumedWriter.write(testRecord.toString());
+ consumedWriter.newLine();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+ }
+ }
+ }
+
+ private static Consumer<String, String> createConsumer(String brokerUrl) {
+ Map<String, Object> consumerProps = Map.of(
+ ConsumerConfig.GROUP_ID_CONFIG, "log-cleaner-test-" +
RANDOM.nextInt(Integer.MAX_VALUE),
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+ );
+ return new KafkaConsumer<>(consumerProps, new StringDeserializer(),
new StringDeserializer());
+ }
+
+ /**
+ * Wait for condition to be true for at most 15 seconds, checking every
100ms
+ */
+ private static void waitUntilTrue(Supplier<Boolean> condition,
Supplier<String> timeoutMessage) throws InterruptedException {
+ final long defaultMaxWaitMs = 15000; // 15 seconds
+ final long defaultPollIntervalMs = 100; // 100ms
+ long endTime = System.currentTimeMillis() + defaultMaxWaitMs;
+
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ if (condition.get()) {
+ return;
+ }
+ } catch (Exception e) {
+ // Continue trying until timeout
+ }
+ TimeUnit.MILLISECONDS.sleep(defaultPollIntervalMs);
+ }
+
+ throw new RuntimeException(timeoutMessage.get());
+ }
+}