This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a5562c3418 KAFKA-19306 Migrate LogCompactionTester to tools module 
(#19905)
4a5562c3418 is described below

commit 4a5562c3418846b2c63999d98f79daad5d29783b
Author: Yunchi Pang <[email protected]>
AuthorDate: Sun Aug 17 11:49:06 2025 -0700

    KAFKA-19306 Migrate LogCompactionTester to tools module (#19905)
    
    jira: [KAFKA-19306](https://issues.apache.org/jira/browse/KAFKA-19306)
    
    log
    ```
    Producing 1000000 messages..to topics
    log-cleaner-test-849894102467800668-0
    Logging produce requests to
    /tmp/kafka-log-cleaner-produced-6049271649847384547.txt
    Sleeping for 20seconds...
    Consuming messages...
    Logging consumed messages to
    /tmp/kafka-log-cleaner-consumed-7065252868189829937.txt
    1000000 rows of data produced, 120176 rows of data consumed (88.0%
    reduction).
    De-duplicating and validating output files...
    Validated 90057 values, 0 mismatches.
    Data verification is completed
    ```
    result
    ```
    
================================================================================
    SESSION REPORT (ALL TESTS)
    ducktape version: 0.12.0
    session_id:       2025-07-10--001
    run time:         1 minute 2.051 seconds
    tests run:        1
    passed:           1
    flaky:            0
    failed:           0
    ignored:          0
    
================================================================================
    test_id:
    
kafkatest.tests.tools.log_compaction_test.LogCompactionTest.test_log_compaction.metadata_quorum=ISOLATED_KRAFT
    status:     PASS
    run time:   1 minute 1.809 seconds
    ```
    
    Reviewers: Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../scala/kafka/tools/LogCompactionTester.scala    | 349 ---------------
 tests/kafkatest/services/log_compaction_tester.py  |   2 +-
 .../apache/kafka/tools/LogCompactionTester.java    | 492 +++++++++++++++++++++
 3 files changed, 493 insertions(+), 350 deletions(-)

diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala 
b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
deleted file mode 100755
index 90be819da4d..00000000000
--- a/core/src/test/scala/kafka/tools/LogCompactionTester.scala
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.io._
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets.UTF_8
-import java.nio.file.{Files, Path}
-import java.time.Duration
-import java.util.{Properties, Random}
-
-import joptsimple.OptionParser
-import kafka.utils._
-import org.apache.kafka.clients.admin.{Admin, NewTopic}
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.serialization.{ByteArraySerializer, 
StringDeserializer}
-import org.apache.kafka.common.utils.{Exit, AbstractIterator, Utils}
-import org.apache.kafka.server.util.CommandLineUtils
-
-import scala.jdk.CollectionConverters._
-
-/**
- * This is a torture test that runs against an existing broker
- *
- * Here is how it works:
- *
- * It produces a series of specially formatted messages to one or more 
partitions. Each message it produces
- * it logs out to a text file. The messages have a limited set of keys, so 
there is duplication in the key space.
- *
- * The broker will clean its log as the test runs.
- *
- * When the specified number of messages have been produced we create a 
consumer and consume all the messages in the topic
- * and write that out to another text file.
- *
- * Using a stable unix sort we sort both the producer log of what was sent and 
the consumer log of what was retrieved by the message key.
- * Then we compare the final message in both logs for each key. If this final 
message is not the same for all keys we
- * print an error and exit with exit code 1, otherwise we print the size 
reduction and exit with exit code 0.
- */
-object LogCompactionTester {
-
-  //maximum line size while reading produced/consumed record text file
-  private val ReadAheadLimit = 4906
-
-  def main(args: Array[String]): Unit = {
-    val parser = new OptionParser(false)
-    val numMessagesOpt = parser.accepts("messages", "The number of messages to 
send or consume.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(Long.MaxValue)
-    val messageCompressionOpt = parser.accepts("compression-type", "message 
compression type")
-      .withOptionalArg
-      .describedAs("compressionType")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo("none")
-    val numDupsOpt = parser.accepts("duplicates", "The number of duplicates 
for each key.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(5)
-    val brokerOpt = parser.accepts("bootstrap-server", "The server(s) to 
connect to.")
-      .withRequiredArg
-      .describedAs("url")
-      .ofType(classOf[String])
-    val topicsOpt = parser.accepts("topics", "The number of topics to test.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-    val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage 
of updates that are deletes.")
-      .withRequiredArg
-      .describedAs("percent")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-    val sleepSecsOpt = parser.accepts("sleep", "Time in milliseconds to sleep 
between production and consumption.")
-      .withRequiredArg
-      .describedAs("ms")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-
-    val options = parser.parse(args: _*)
-
-    if (args.isEmpty)
-      CommandLineUtils.printUsageAndExit(parser, "A tool to test log 
compaction. Valid options are: ")
-
-    CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, 
numMessagesOpt)
-
-    // parse options
-    val messages = options.valueOf(numMessagesOpt).longValue
-    val compressionType = options.valueOf(messageCompressionOpt)
-    val percentDeletes = options.valueOf(percentDeletesOpt).intValue
-    val dups = options.valueOf(numDupsOpt).intValue
-    val brokerUrl = options.valueOf(brokerOpt)
-    val topicCount = options.valueOf(topicsOpt).intValue
-    val sleepSecs = options.valueOf(sleepSecsOpt).intValue
-
-    val testId = new Random().nextLong
-    val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + 
_).toArray
-    createTopics(brokerUrl, topics.toSeq)
-
-    println(s"Producing $messages messages..to topics ${topics.mkString(",")}")
-    val producedDataFilePath = produceMessages(brokerUrl, topics, messages, 
compressionType, dups, percentDeletes)
-    println(s"Sleeping for $sleepSecs seconds...")
-    Thread.sleep(sleepSecs * 1000)
-    println("Consuming messages...")
-    val consumedDataFilePath = consumeMessages(brokerUrl, topics)
-
-    val producedLines = lineCount(producedDataFilePath)
-    val consumedLines = lineCount(consumedDataFilePath)
-    val reduction = 100 * (1.0 - consumedLines.toDouble / 
producedLines.toDouble)
-    println(f"$producedLines%d rows of data produced, $consumedLines%d rows of 
data consumed ($reduction%.1f%% reduction).")
-
-    println("De-duplicating and validating output files...")
-    validateOutput(producedDataFilePath.toFile, consumedDataFilePath.toFile)
-    Utils.delete(producedDataFilePath.toFile)
-    Utils.delete(consumedDataFilePath.toFile)
-    //if you change this line, we need to update test_log_compaction_tool.py 
system test
-    println("Data verification is completed")
-  }
-
-  def createTopics(brokerUrl: String, topics: Seq[String]): Unit = {
-    val adminConfig = new Properties
-    adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
-    val adminClient = Admin.create(adminConfig)
-
-    try {
-      val topicConfigs = java.util.Map.of(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-      val newTopics = topics.map(name => new NewTopic(name, 1, 
1.toShort).configs(topicConfigs)).asJava
-      adminClient.createTopics(newTopics).all.get
-
-      var pendingTopics: Seq[String] = Seq()
-      TestUtils.waitUntilTrue(() => {
-        val allTopics = adminClient.listTopics.names.get.asScala.toSeq
-        pendingTopics = topics.filter(topicName => 
!allTopics.contains(topicName))
-        pendingTopics.isEmpty
-      }, s"timed out waiting for topics : $pendingTopics")
-
-    } finally adminClient.close()
-  }
-
-  def lineCount(filPath: Path): Int = Files.readAllLines(filPath).size
-
-  def validateOutput(producedDataFile: File, consumedDataFile: File): Unit = {
-    val producedReader = externalSort(producedDataFile)
-    val consumedReader = externalSort(consumedDataFile)
-    val produced = valuesIterator(producedReader)
-    val consumed = valuesIterator(consumedReader)
-
-    val producedDedupedFile = new File(producedDataFile.getAbsolutePath + 
".deduped")
-    val producedDeduped : BufferedWriter = 
Files.newBufferedWriter(producedDedupedFile.toPath, UTF_8)
-
-    val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + 
".deduped")
-    val consumedDeduped : BufferedWriter = 
Files.newBufferedWriter(consumedDedupedFile.toPath, UTF_8)
-    var total = 0
-    var mismatched = 0
-    while (produced.hasNext && consumed.hasNext) {
-      val p = produced.next()
-      producedDeduped.write(p.toString)
-      producedDeduped.newLine()
-      val c = consumed.next()
-      consumedDeduped.write(c.toString)
-      consumedDeduped.newLine()
-      if (p != c)
-        mismatched += 1
-      total += 1
-    }
-    producedDeduped.close()
-    consumedDeduped.close()
-    println(s"Validated $total values, $mismatched mismatches.")
-    require(!produced.hasNext, "Additional values produced not found in 
consumer log.")
-    require(!consumed.hasNext, "Additional values consumed not found in 
producer log.")
-    require(mismatched == 0, "Non-zero number of row mismatches.")
-    // if all the checks worked out we can delete the deduped files
-    Utils.delete(producedDedupedFile)
-    Utils.delete(consumedDedupedFile)
-  }
-
-  def require(requirement: Boolean, message: => Any): Unit = {
-    if (!requirement) {
-      System.err.println(s"Data validation failed : $message")
-      Exit.exit(1)
-    }
-  }
-
-  def valuesIterator(reader: BufferedReader): Iterator[TestRecord] = {
-    new AbstractIterator[TestRecord] {
-      def makeNext(): TestRecord = {
-        var next = readNext(reader)
-        while (next != null && next.delete)
-          next = readNext(reader)
-        if (next == null)
-          allDone()
-        else
-          next
-      }
-    }.asScala
-  }
-
-  def readNext(reader: BufferedReader): TestRecord = {
-    var line = reader.readLine()
-    if (line == null)
-      return null
-    var curr = TestRecord.parse(line)
-    while (true) {
-      line = peekLine(reader)
-      if (line == null)
-        return curr
-      val next = TestRecord.parse(line)
-      if (next == null || next.topicAndKey != curr.topicAndKey)
-        return curr
-      curr = next
-      reader.readLine()
-    }
-    null
-  }
-
-  def peekLine(reader: BufferedReader) = {
-    reader.mark(ReadAheadLimit)
-    val line = reader.readLine
-    reader.reset()
-    line
-  }
-
-  def externalSort(file: File): BufferedReader = {
-    val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", 
"--buffer-size=20%", "--temporary-directory=" + 
Files.createTempDirectory("log_compaction_test"), file.getAbsolutePath)
-    val process = builder.start
-    new Thread() {
-      override def run(): Unit = {
-        val exitCode = process.waitFor()
-        if (exitCode != 0) {
-          System.err.println("Process exited abnormally.")
-          while (process.getErrorStream.available > 0) {
-            System.err.write(process.getErrorStream.read())
-          }
-        }
-      }
-    }.start()
-    new BufferedReader(new InputStreamReader(process.getInputStream, UTF_8), 
10 * 1024 * 1024)
-  }
-
-  def produceMessages(brokerUrl: String,
-                      topics: Array[String],
-                      messages: Long,
-                      compressionType: String,
-                      dups: Int,
-                      percentDeletes: Int): Path = {
-    val producerProps = new Properties
-    producerProps.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
Long.MaxValue.toString)
-    producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerUrl)
-    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
compressionType)
-    val producer = new KafkaProducer(producerProps, new ByteArraySerializer, 
new ByteArraySerializer)
-    try {
-      val rand = new Random(1)
-      val keyCount = (messages / dups).toInt
-      val producedFilePath = 
Files.createTempFile("kafka-log-cleaner-produced-", ".txt")
-      println(s"Logging produce requests to $producedFilePath")
-      val producedWriter: BufferedWriter = 
Files.newBufferedWriter(producedFilePath, UTF_8)
-      for (i <- 0L until (messages * topics.length)) {
-        val topic = topics((i % topics.length).toInt)
-        val key = rand.nextInt(keyCount)
-        val delete = (i % 100) < percentDeletes
-        val msg =
-          if (delete)
-            new ProducerRecord[Array[Byte], Array[Byte]](topic, 
key.toString.getBytes(UTF_8), null)
-          else
-            new ProducerRecord(topic, key.toString.getBytes(UTF_8), 
i.toString.getBytes(UTF_8))
-        producer.send(msg)
-        producedWriter.write(TestRecord(topic, key, i, delete).toString)
-        producedWriter.newLine()
-      }
-      producedWriter.close()
-      producedFilePath
-    } finally {
-      producer.close()
-    }
-  }
-
-  def createConsumer(brokerUrl: String): Consumer[String, String] = {
-    val consumerProps = new Properties
-    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
-    consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerUrl)
-    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
-    new KafkaConsumer(consumerProps, new StringDeserializer, new 
StringDeserializer)
-  }
-
-  def consumeMessages(brokerUrl: String, topics: Array[String]): Path = {
-    val consumer = createConsumer(brokerUrl)
-    consumer.subscribe(topics.toSeq.asJava)
-    val consumedFilePath = Files.createTempFile("kafka-log-cleaner-consumed-", 
".txt")
-    println(s"Logging consumed messages to $consumedFilePath")
-    val consumedWriter: BufferedWriter = 
Files.newBufferedWriter(consumedFilePath, UTF_8)
-
-    try {
-      var done = false
-      while (!done) {
-        val consumerRecords = consumer.poll(Duration.ofSeconds(20))
-        if (!consumerRecords.isEmpty) {
-          for (record <- consumerRecords.asScala) {
-            val delete = record.value == null
-            val value = if (delete) -1L else record.value.toLong
-            consumedWriter.write(TestRecord(record.topic, record.key.toInt, 
value, delete).toString)
-            consumedWriter.newLine()
-          }
-        } else {
-          done = true
-        }
-      }
-      consumedFilePath
-    } finally {
-      consumedWriter.close()
-      consumer.close()
-    }
-  }
-
-  def readString(buffer: ByteBuffer): String = {
-    Utils.utf8(buffer)
-  }
-
-}
-
-case class TestRecord(topic: String, key: Int, value: Long, delete: Boolean) {
-  override def toString = topic + "\t" + key + "\t" + value + "\t" + (if 
(delete) "d" else "u")
-  def topicAndKey = topic + key
-}
-
-object TestRecord {
-  def parse(line: String): TestRecord = {
-    val components = line.split("\t")
-    new TestRecord(components(0), components(1).toInt, components(2).toLong, 
components(3) == "d")
-  }
-}
diff --git a/tests/kafkatest/services/log_compaction_tester.py 
b/tests/kafkatest/services/log_compaction_tester.py
index cc6bf4fc296..332edd2f769 100644
--- a/tests/kafkatest/services/log_compaction_tester.py
+++ b/tests/kafkatest/services/log_compaction_tester.py
@@ -82,7 +82,7 @@ class LogCompactionTester(KafkaPathResolverMixin, 
BackgroundThreadService):
         node.account.ssh("rm -rf %s" % LogCompactionTester.OUTPUT_DIR, 
allow_fail=False)
 
     def java_class_name(self):
-        return "kafka.tools.LogCompactionTester"
+        return "org.apache.kafka.tools.LogCompactionTester"
 
     @property
     def is_done(self):
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java 
b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java
new file mode 100644
index 00000000000..accf5241a35
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
+
+/**
+ * This is a torture test that runs against an existing broker
+ * <p>
+ * Here is how it works:
+ * <p>
+ * It produces a series of specially formatted messages to one or more 
partitions. Each message it produces
+ * it logs out to a text file. The messages have a limited set of keys, so 
there is duplication in the key space.
+ * <p>
+ * The broker will clean its log as the test runs.
+ * <p>
+ * When the specified number of messages has been produced we create a 
consumer and consume all the messages in the topic
+ * and write that out to another text file.
+ * <p>
+ * Using a stable unix sort we sort both the producer log of what was sent and 
the consumer log of what was retrieved by the message key.
+ * Then we compare the final message in both logs for each key. If this final 
message is not the same for all keys we
+ * print an error and exit with exit code 1, otherwise we print the size 
reduction and exit with exit code 0.
+ */
+public class LogCompactionTester {
+
+    public static class Options {
+        public final OptionSpec<Long> numMessagesOpt;
+        public final OptionSpec<String>  messageCompressionOpt;
+        public final OptionSpec<Integer> numDupsOpt;
+        public final OptionSpec<String>  brokerOpt;
+        public final OptionSpec<Integer> topicsOpt;
+        public final OptionSpec<Integer> percentDeletesOpt;
+        public final OptionSpec<Integer> sleepSecsOpt;
+        public final OptionSpec<Void>    helpOpt;
+
+        public Options(OptionParser parser) {
+            numMessagesOpt = parser
+                    .accepts("messages", "The number of messages to send or 
consume.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Long.class)
+                    .defaultsTo(Long.MAX_VALUE);
+
+            messageCompressionOpt = parser
+                    .accepts("compression-type", "message compression type")
+                    .withOptionalArg()
+                    .describedAs("compressionType")
+                    .ofType(String.class)
+                    .defaultsTo("none");
+
+            numDupsOpt = parser
+                    .accepts("duplicates", "The number of duplicates for each 
key.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Integer.class)
+                    .defaultsTo(5);
+
+            brokerOpt = parser
+                    .accepts("bootstrap-server", "The server(s) to connect 
to.")
+                    .withRequiredArg()
+                    .describedAs("url")
+                    .ofType(String.class);
+
+            topicsOpt = parser
+                    .accepts("topics", "The number of topics to test.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Integer.class)
+                    .defaultsTo(1);
+
+            percentDeletesOpt = parser
+                    .accepts("percent-deletes", "The percentage of updates 
that are deletes.")
+                    .withRequiredArg()
+                    .describedAs("percent")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+
+            sleepSecsOpt = parser
+                    .accepts("sleep", "Time in milliseconds to sleep between 
production and consumption.")
+                    .withRequiredArg()
+                    .describedAs("ms")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+
+            helpOpt = parser
+                    .acceptsAll(List.of("h", "help"), "Display help 
information");
+        }
+    }
+
+    public record TestRecord(String topic, int key, long value, boolean 
delete) {
+        @Override
+        public String toString() {
+            return topic + "\t" + key + "\t" + value + "\t" + (delete ? "d" : 
"u");
+        }
+
+        public String getTopicAndKey() {
+            return topic + key;
+        }
+
+        public static TestRecord parse(String line) {
+            String[] components = line.split("\t");
+            if (components.length != 4) {
+                throw new IllegalArgumentException("Invalid TestRecord format: 
" + line);
+            }
+
+            return new TestRecord(
+                    components[0],
+                    Integer.parseInt(components[1]),
+                    Long.parseLong(components[2]),
+                    "d".equals(components[3])
+            );
+        }
+    }
+
+    public static class TestRecordUtils {
+        // maximum line size while reading produced/consumed record text file
+        private static final int READ_AHEAD_LIMIT = 4906;
+
+        public static TestRecord readNext(BufferedReader reader) throws 
IOException {
+            String line = reader.readLine();
+            if (line == null) {
+                return null;
+            }
+            TestRecord curr = TestRecord.parse(line);
+            while (true) {
+                String peekedLine = peekLine(reader);
+                if (peekedLine == null) {
+                    return curr;
+                }
+                TestRecord next = TestRecord.parse(peekedLine);
+                if (!next.getTopicAndKey().equals(curr.getTopicAndKey())) {
+                    return curr;
+                }
+                curr = next;
+                reader.readLine();
+            }
+        }
+
+        public static Iterator<TestRecord> valuesIterator(BufferedReader 
reader) {
+            return Spliterators.iterator(new 
Spliterators.AbstractSpliterator<>(
+                    Long.MAX_VALUE, Spliterator.ORDERED) {
+                @Override
+                public boolean tryAdvance(java.util.function.Consumer<? super 
TestRecord> action) {
+                    try {
+                        TestRecord rec;
+                        do {
+                            rec = readNext(reader);
+                        } while (rec != null && rec.delete);
+                        if (rec == null) return false;
+                        action.accept(rec);
+                        return true;
+                    } catch (IOException e) {
+                        throw new UncheckedIOException(e);
+                    }
+                }
+            });
+        }
+
+        public static String peekLine(BufferedReader reader) throws 
IOException {
+            reader.mark(READ_AHEAD_LIMIT);
+            String line = reader.readLine();
+            reader.reset();
+            return line;
+        }
+    }
+
+    private static final Random RANDOM = new Random();
+
+    public static void main(String[] args) throws Exception {
+
+        OptionParser parser = new OptionParser(false);
+        Options options = new Options(parser);
+
+        OptionSet optionSet = parser.parse(args);
+        if (args.length == 0) {
+            CommandLineUtils.printUsageAndExit(parser,
+                    "A tool to test log compaction. Valid options are: ");
+        }
+
+        CommandLineUtils.checkRequiredArgs(parser, optionSet, 
options.brokerOpt, options.numMessagesOpt);
+
+        long messages = optionSet.valueOf(options.numMessagesOpt);
+        String compressionType = 
optionSet.valueOf(options.messageCompressionOpt);
+        int percentDeletes = optionSet.valueOf(options.percentDeletesOpt);
+        int dups = optionSet.valueOf(options.numDupsOpt);
+        String brokerUrl = optionSet.valueOf(options.brokerOpt);
+        int topicCount = optionSet.valueOf(options.topicsOpt);
+        int sleepSecs = optionSet.valueOf(options.sleepSecsOpt);
+
+        long testId = RANDOM.nextLong();
+        String[] topics = IntStream.range(0, topicCount)
+                .mapToObj(i -> "log-cleaner-test-" + testId + "-" + i)
+                .toArray(String[]::new);
+        createTopics(brokerUrl, topics);
+
+        System.out.println("Producing " + messages + " messages..to topics " + 
String.join(",", topics));
+        Path producedDataFilePath = produceMessages(
+                brokerUrl, topics, messages,
+                compressionType, dups, percentDeletes);
+        System.out.println("Sleeping for " + sleepSecs + "seconds...");
+        TimeUnit.MILLISECONDS.sleep(sleepSecs * 1000L);
+        System.out.println("Consuming messages...");
+        Path consumedDataFilePath = consumeMessages(brokerUrl, topics);
+
+        long producedLines = lineCount(producedDataFilePath);
+        long consumedLines = lineCount(consumedDataFilePath);
+        double reduction = 100 * (1.0 - (double) consumedLines / 
producedLines);
+
+        System.out.printf(
+            "%d rows of data produced, %d rows of data consumed (%.1f%% 
reduction).%n",
+            producedLines, consumedLines, reduction);
+
+        System.out.println("De-duplicating and validating output files...");
+        validateOutput(producedDataFilePath.toFile(), 
consumedDataFilePath.toFile());
+
+        Files.deleteIfExists(producedDataFilePath);
+        Files.deleteIfExists(consumedDataFilePath);
+        // if you change this line, we need to update 
test_log_compaction_tool.py system test
+        System.out.println("Data verification is completed");
+    }
+
+
+    private static void createTopics(String brokerUrl, String[] topics) throws 
Exception {
+        Properties adminConfig = new Properties();
+        adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
brokerUrl);
+
+        try (Admin adminClient = Admin.create(adminConfig)) {
+            Map<String, String> topicConfigs = Map.of(
+                    TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT
+            );
+            List<NewTopic> newTopics = Arrays.stream(topics)
+                    .map(name -> new NewTopic(name, 1, (short) 
1).configs(topicConfigs)).toList();
+            adminClient.createTopics(newTopics).all().get();
+
+            final List<String> pendingTopics = new ArrayList<>();
+            waitUntilTrue(() -> {
+                try {
+                    Set<String> allTopics = 
adminClient.listTopics().names().get();
+                    pendingTopics.clear();
+                    pendingTopics.addAll(
+                            Arrays.stream(topics)
+                                    .filter(topicName -> 
!allTopics.contains(topicName))
+                                    .toList()
+                    );
+                    return pendingTopics.isEmpty();
+                } catch (InterruptedException | 
java.util.concurrent.ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            }, () -> "timed out waiting for topics: " + pendingTopics);
+        }
+    }
+
+    private static void validateOutput(File producedDataFile, File 
consumedDataFile) {
+        try (BufferedReader producedReader = externalSort(producedDataFile);
+             BufferedReader consumedReader = externalSort(consumedDataFile)) {
+            Iterator<TestRecord> produced = 
TestRecordUtils.valuesIterator(producedReader);
+            Iterator<TestRecord> consumed = 
TestRecordUtils.valuesIterator(consumedReader);
+
+            File producedDedupedFile = new 
File(producedDataFile.getAbsolutePath() + ".deduped");
+            File consumedDedupedFile = new 
File(consumedDataFile.getAbsolutePath() + ".deduped");
+
+            try (BufferedWriter producedDeduped = Files.newBufferedWriter(
+                    producedDedupedFile.toPath(), StandardCharsets.UTF_8);
+                 BufferedWriter consumedDeduped = Files.newBufferedWriter(
+                         consumedDedupedFile.toPath(), 
StandardCharsets.UTF_8)) {
+                int total = 0;
+                int mismatched = 0;
+                while (produced.hasNext() && consumed.hasNext()) {
+                    TestRecord p = produced.next();
+                    producedDeduped.write(p.toString());
+                    producedDeduped.newLine();
+
+                    TestRecord c = consumed.next();
+                    consumedDeduped.write(c.toString());
+                    consumedDeduped.newLine();
+
+                    if (!p.equals(c)) {
+                        mismatched++;
+                    }
+                    total++;
+                }
+
+                System.out.printf("Validated %d values, %d mismatches.%n", 
total, mismatched);
+                require(!produced.hasNext(), "Additional values produced not 
found in consumer log.");
+                require(!consumed.hasNext(), "Additional values consumed not 
found in producer log.");
+                require(mismatched == 0, "Non-zero number of row mismatches.");
+                // if all the checks worked out we can delete the deduped files
+                Files.deleteIfExists(producedDedupedFile.toPath());
+                Files.deleteIfExists(consumedDedupedFile.toPath());
+            }
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static BufferedReader externalSort(File file) throws IOException {
+        Path tempDir = Files.createTempDirectory("log_compaction_test");
+
+        ProcessBuilder builder = new ProcessBuilder(
+                "sort", "--key=1,2", "--stable", "--buffer-size=20%",
+                "--temporary-directory=" + tempDir.toString(), 
file.getAbsolutePath());
+        builder.redirectError(ProcessBuilder.Redirect.INHERIT);
+        
+        Process process;
+        try {
+            process = builder.start();
+        } catch (IOException e) {
+            // clean up temp directory if process fails to start
+            try {
+                Files.deleteIfExists(tempDir);
+            } catch (IOException cleanupException) {
+                e.addSuppressed(cleanupException);
+            }
+            throw new IOException("Failed to start sort process. Ensure 'sort' 
command is available.", e);
+        }
+
+        return new BufferedReader(
+                new InputStreamReader(process.getInputStream(), 
StandardCharsets.UTF_8),
+                10 * 1024 * 1024
+        );
+    }
+
+    private static long lineCount(Path filePath) throws IOException {
+        try (Stream<String> lines = Files.lines(filePath)) {
+            return lines.count();
+        }
+    }
+
+    private static void require(boolean requirement, String message) {
+        if (!requirement) {
+            System.err.println("Data validation failed : " + message);
+            Exit.exit(1);
+        }
+    }
+
+    private static Path produceMessages(String brokerUrl, String[] topics, 
long messages,
+                                        String compressionType, int dups, int 
percentDeletes) throws IOException {
+        Map<String, Object> producerProps = Map.of(
+                ProducerConfig.MAX_BLOCK_MS_CONFIG, 
String.valueOf(Long.MAX_VALUE),
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl,
+                ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType
+        );
+
+        try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
+                producerProps, new ByteArraySerializer(), new 
ByteArraySerializer())) {
+            int keyCount = (int) (messages / dups);
+            Path producedFilePath = 
Files.createTempFile("kafka-log-cleaner-produced-", ".txt");
+            System.out.println("Logging produce requests to " + 
producedFilePath);
+
+            try (BufferedWriter producedWriter = Files.newBufferedWriter(
+                    producedFilePath, StandardCharsets.UTF_8)) {
+                for (long i = 0; i < messages * topics.length; i++) {
+                    String topic = topics[(int) (i % topics.length)];
+                    int key = RANDOM.nextInt(keyCount);
+                    boolean delete = (i % 100) < percentDeletes;
+                    ProducerRecord<byte[], byte[]> record;
+                    if (delete) {
+                        record = new ProducerRecord<>(topic,
+                                
String.valueOf(key).getBytes(StandardCharsets.UTF_8), null);
+                    } else {
+                        record = new ProducerRecord<>(topic,
+                                
String.valueOf(key).getBytes(StandardCharsets.UTF_8),
+                                
String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+                    }
+                    producer.send(record);
+                    producedWriter.write(new TestRecord(topic, key, i, 
delete).toString());
+                    producedWriter.newLine();
+                }
+            }
+            return producedFilePath;
+        }
+    }
+
+    private static Path consumeMessages(String brokerUrl, String[] topics) 
throws IOException {
+
+        Path consumedFilePath = 
Files.createTempFile("kafka-log-cleaner-consumed-", ".txt");
+        System.out.println("Logging consumed messages to " + consumedFilePath);
+
+        try (Consumer<String, String> consumer = createConsumer(brokerUrl);
+             BufferedWriter consumedWriter = 
Files.newBufferedWriter(consumedFilePath, StandardCharsets.UTF_8)) {
+            consumer.subscribe(Arrays.asList(topics));
+            while (true) {
+                ConsumerRecords<String, String> consumerRecords = 
consumer.poll(Duration.ofSeconds(20));
+                if (consumerRecords.isEmpty()) return consumedFilePath;
+                consumerRecords.forEach(
+                    record -> {
+                        try {
+                            boolean delete = record.value() == null;
+                            long value = delete ? -1L : 
Long.parseLong(record.value());
+                            TestRecord testRecord = new TestRecord(
+                                    record.topic(), 
Integer.parseInt(record.key()), value, delete);
+                            consumedWriter.write(testRecord.toString());
+                            consumedWriter.newLine();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                );
+            }
+        }
+    }
+
+    private static Consumer<String, String> createConsumer(String brokerUrl) {
+        Map<String, Object> consumerProps = Map.of(
+                ConsumerConfig.GROUP_ID_CONFIG, "log-cleaner-test-" + 
RANDOM.nextInt(Integer.MAX_VALUE),
+                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl,
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+        );
+        return new KafkaConsumer<>(consumerProps, new StringDeserializer(), 
new StringDeserializer());
+    }
+
+    /**
+     * Wait for condition to be true for at most 15 seconds, checking every 
100ms
+     */
+    private static void waitUntilTrue(Supplier<Boolean> condition, 
Supplier<String> timeoutMessage) throws InterruptedException {
+        final long defaultMaxWaitMs = 15000; // 15 seconds
+        final long defaultPollIntervalMs = 100; // 100ms
+        long endTime = System.currentTimeMillis() + defaultMaxWaitMs;
+
+        while (System.currentTimeMillis() < endTime) {
+            try {
+                if (condition.get()) {
+                    return;
+                }
+            } catch (Exception e) {
+                // Continue trying until timeout
+            }
+            TimeUnit.MILLISECONDS.sleep(defaultPollIntervalMs);
+        }
+
+        throw new RuntimeException(timeoutMessage.get());
+    }
+}


Reply via email to