[ 
https://issues.apache.org/jira/browse/KAFKA-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585776#comment-16585776
 ] 

ASF GitHub Bot commented on KAFKA-7210:
---------------------------------------

ijuma closed pull request #5226: KAFKA-7210:  Add a system test to verify the 
log compaction 
URL: https://github.com/apache/kafka/pull/5226
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 0892ed19402..c1387d4ef60 100644
--- a/build.gradle
+++ b/build.gradle
@@ -775,6 +775,9 @@ project(':core') {
       include('*.jar')
     }
     into "$buildDir/dependant-testlibs"
+    //By default gradle does not handle test dependencies between the 
sub-projects
+    //This line is to include clients project test jar to dependant-testlibs
+    from (project(':clients').testJar ) { "$buildDir/dependant-testlibs" }
     duplicatesStrategy 'exclude'
   }
 
diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala 
b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
new file mode 100755
index 00000000000..9f53f664b54
--- /dev/null
+++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Path}
+import java.time.Duration
+import java.util.{Properties, Random}
+
+import joptsimple.OptionParser
+import kafka.utils._
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.{CommonClientConfigs, admin}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.serialization.{ByteArraySerializer, 
StringDeserializer}
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.JavaConverters._
+
+/**
+ * This is a torture test that runs against an existing broker
+ *
+ * Here is how it works:
+ *
+ * It produces a series of specially formatted messages to one or more 
partitions. Each message it produces
+ * it logs out to a text file. The messages have a limited set of keys, so 
there is duplication in the key space.
+ *
+ * The broker will clean its log as the test runs.
+ *
+ * When the specified number of messages have been produced we create a 
consumer and consume all the messages in the topic
+ * and write that out to another text file.
+ *
+ * Using a stable unix sort we sort both the producer log of what was sent and 
the consumer log of what was retrieved by the message key.
+ * Then we compare the final message in both logs for each key. If this final 
message is not the same for all keys we
+ * print an error and exit with exit code 1, otherwise we print the size 
reduction and exit with exit code 0.
+ */
+object LogCompactionTester {
+
+  //maximum line size while reading produced/consumed record text file
+  private val ReadAheadLimit = 4906
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser(false)
+    val numMessagesOpt = parser.accepts("messages", "The number of messages to 
send or consume.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(Long.MaxValue)
+    val messageCompressionOpt = parser.accepts("compression-type", "message 
compression type")
+      .withOptionalArg
+      .describedAs("compressionType")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo("none")
+    val numDupsOpt = parser.accepts("duplicates", "The number of duplicates 
for each key.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(5)
+    val brokerOpt = parser.accepts("bootstrap-server", "The server(s) to 
connect to.")
+      .withRequiredArg
+      .describedAs("url")
+      .ofType(classOf[String])
+    val topicsOpt = parser.accepts("topics", "The number of topics to test.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+    val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage 
of updates that are deletes.")
+      .withRequiredArg
+      .describedAs("percent")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+    val sleepSecsOpt = parser.accepts("sleep", "Time in milliseconds to sleep 
between production and consumption.")
+      .withRequiredArg
+      .describedAs("ms")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+
+    val options = parser.parse(args: _*)
+
+    if (args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "A tool to test log 
compaction. Valid options are: ")
+
+    CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, 
numMessagesOpt)
+
+    // parse options
+    val messages = options.valueOf(numMessagesOpt).longValue
+    val compressionType = options.valueOf(messageCompressionOpt)
+    val percentDeletes = options.valueOf(percentDeletesOpt).intValue
+    val dups = options.valueOf(numDupsOpt).intValue
+    val brokerUrl = options.valueOf(brokerOpt)
+    val topicCount = options.valueOf(topicsOpt).intValue
+    val sleepSecs = options.valueOf(sleepSecsOpt).intValue
+
+    val testId = new Random().nextLong
+    val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + 
_).toArray
+    createTopics(brokerUrl, topics.toSeq)
+
+    println(s"Producing $messages messages..to topics ${topics.mkString(",")}")
+    val producedDataFilePath = produceMessages(brokerUrl, topics, messages, 
compressionType, dups, percentDeletes)
+    println(s"Sleeping for $sleepSecs seconds...")
+    Thread.sleep(sleepSecs * 1000)
+    println("Consuming messages...")
+    val consumedDataFilePath = consumeMessages(brokerUrl, topics)
+
+    val producedLines = lineCount(producedDataFilePath)
+    val consumedLines = lineCount(consumedDataFilePath)
+    val reduction = 100 * (1.0 - consumedLines.toDouble / 
producedLines.toDouble)
+    println(f"$producedLines%d rows of data produced, $consumedLines%d rows of 
data consumed ($reduction%.1f%% reduction).")
+
+    println("De-duplicating and validating output files...")
+    validateOutput(producedDataFilePath.toFile, consumedDataFilePath.toFile)
+    Utils.delete(producedDataFilePath.toFile)
+    Utils.delete(consumedDataFilePath.toFile)
+    //if you change this line, we need to update test_log_compaction_tool.py 
system test
+    println("Data verification is completed")
+  }
+
+  def createTopics(brokerUrl: String, topics: Seq[String]): Unit = {
+    val adminConfig = new Properties
+    adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+    val adminClient = admin.AdminClient.create(adminConfig)
+
+    try {
+      val topicConfigs = Map(TopicConfig.CLEANUP_POLICY_CONFIG -> 
TopicConfig.CLEANUP_POLICY_COMPACT)
+      val newTopics = topics.map(name => new NewTopic(name, 1, 
1).configs(topicConfigs.asJava)).asJava
+      adminClient.createTopics(newTopics).all.get
+
+      var pendingTopics: Seq[String] = Seq()
+      TestUtils.waitUntilTrue(() => {
+        val allTopics = adminClient.listTopics.names.get.asScala.toSeq
+        pendingTopics = topics.filter(topicName => 
!allTopics.contains(topicName))
+        pendingTopics.isEmpty
+      }, s"timed out waiting for topics : $pendingTopics")
+
+    } finally adminClient.close()
+  }
+
+  def lineCount(filPath: Path): Int = Files.readAllLines(filPath).size
+
+  def validateOutput(producedDataFile: File, consumedDataFile: File) {
+    val producedReader = externalSort(producedDataFile)
+    val consumedReader = externalSort(consumedDataFile)
+    val produced = valuesIterator(producedReader)
+    val consumed = valuesIterator(consumedReader)
+
+    val producedDedupedFile = new File(producedDataFile.getAbsolutePath + 
".deduped")
+    val producedDeduped : BufferedWriter = 
Files.newBufferedWriter(producedDedupedFile.toPath, UTF_8)
+
+    val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + 
".deduped")
+    val consumedDeduped : BufferedWriter = 
Files.newBufferedWriter(consumedDedupedFile.toPath, UTF_8)
+    var total = 0
+    var mismatched = 0
+    while (produced.hasNext && consumed.hasNext) {
+      val p = produced.next()
+      producedDeduped.write(p.toString)
+      producedDeduped.newLine()
+      val c = consumed.next()
+      consumedDeduped.write(c.toString)
+      consumedDeduped.newLine()
+      if (p != c)
+        mismatched += 1
+      total += 1
+    }
+    producedDeduped.close()
+    consumedDeduped.close()
+    println(s"Validated $total values, $mismatched mismatches.")
+    require(!produced.hasNext, "Additional values produced not found in 
consumer log.")
+    require(!consumed.hasNext, "Additional values consumed not found in 
producer log.")
+    require(mismatched == 0, "Non-zero number of row mismatches.")
+    // if all the checks worked out we can delete the deduped files
+    Utils.delete(producedDedupedFile)
+    Utils.delete(consumedDedupedFile)
+  }
+
+  def require(requirement: Boolean, message: => Any) {
+    if (!requirement) {
+      System.err.println(s"Data validation failed : $message")
+      Exit.exit(1)
+    }
+  }
+
+  def valuesIterator(reader: BufferedReader) = {
+    new IteratorTemplate[TestRecord] {
+      def makeNext(): TestRecord = {
+        var next = readNext(reader)
+        while (next != null && next.delete)
+          next = readNext(reader)
+        if (next == null)
+          allDone()
+        else
+          next
+      }
+    }
+  }
+
+  def readNext(reader: BufferedReader): TestRecord = {
+    var line = reader.readLine()
+    if (line == null)
+      return null
+    var curr = TestRecord.parse(line)
+    while (true) {
+      line = peekLine(reader)
+      if (line == null)
+        return curr
+      val next = TestRecord.parse(line)
+      if (next == null || next.topicAndKey != curr.topicAndKey)
+        return curr
+      curr = next
+      reader.readLine()
+    }
+    null
+  }
+
+  def peekLine(reader: BufferedReader) = {
+    reader.mark(ReadAheadLimit)
+    val line = reader.readLine
+    reader.reset()
+    line
+  }
+
+  def externalSort(file: File): BufferedReader = {
+    val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", 
"--buffer-size=20%", "--temporary-directory=" + 
Files.createTempDirectory("log_compaction_test"), file.getAbsolutePath)
+    val process = builder.start
+    new Thread() {
+      override def run() {
+        val exitCode = process.waitFor()
+        if (exitCode != 0) {
+          System.err.println("Process exited abnormally.")
+          while (process.getErrorStream.available > 0) {
+            System.err.write(process.getErrorStream().read())
+          }
+        }
+      }
+    }.start()
+    new BufferedReader(new InputStreamReader(process.getInputStream(), UTF_8), 
10 * 1024 * 1024)
+  }
+
+  def produceMessages(brokerUrl: String,
+                      topics: Array[String],
+                      messages: Long,
+                      compressionType: String,
+                      dups: Int,
+                      percentDeletes: Int): Path = {
+    val producerProps = new Properties
+    producerProps.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
Long.MaxValue.toString)
+    producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerUrl)
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
compressionType)
+    val producer = new KafkaProducer(producerProps, new ByteArraySerializer, 
new ByteArraySerializer)
+    try {
+      val rand = new Random(1)
+      val keyCount = (messages / dups).toInt
+      val producedFilePath = 
Files.createTempFile("kafka-log-cleaner-produced-", ".txt")
+      println(s"Logging produce requests to $producedFilePath")
+      val producedWriter: BufferedWriter = 
Files.newBufferedWriter(producedFilePath, UTF_8)
+      for (i <- 0L until (messages * topics.length)) {
+        val topic = topics((i % topics.length).toInt)
+        val key = rand.nextInt(keyCount)
+        val delete = (i % 100) < percentDeletes
+        val msg =
+          if (delete)
+            new ProducerRecord[Array[Byte], Array[Byte]](topic, 
key.toString.getBytes(UTF_8), null)
+          else
+            new ProducerRecord(topic, key.toString.getBytes(UTF_8), 
i.toString.getBytes(UTF_8))
+        producer.send(msg)
+        producedWriter.write(TestRecord(topic, key, i, delete).toString)
+        producedWriter.newLine()
+      }
+      producedWriter.close()
+      producedFilePath
+    } finally {
+      producer.close()
+    }
+  }
+
+  def createConsumer(brokerUrl: String): KafkaConsumer[String, String] = {
+    val consumerProps = new Properties
+    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
+    consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerUrl)
+    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    new KafkaConsumer(consumerProps, new StringDeserializer, new 
StringDeserializer)
+  }
+
+  def consumeMessages(brokerUrl: String, topics: Array[String]): Path = {
+    val consumer = createConsumer(brokerUrl)
+    consumer.subscribe(topics.seq.asJava)
+    val consumedFilePath = Files.createTempFile("kafka-log-cleaner-consumed-", 
".txt")
+    println(s"Logging consumed messages to $consumedFilePath")
+    val consumedWriter: BufferedWriter = 
Files.newBufferedWriter(consumedFilePath, UTF_8)
+
+    try {
+      var done = false
+      while (!done) {
+        val consumerRecords = consumer.poll(Duration.ofSeconds(20))
+        if (!consumerRecords.isEmpty) {
+          for (record <- consumerRecords.asScala) {
+            val delete = record.value == null
+            val value = if (delete) -1L else record.value.toLong
+            consumedWriter.write(TestRecord(record.topic, record.key.toInt, 
value, delete).toString)
+            consumedWriter.newLine
+          }
+        } else {
+          done = true
+        }
+      }
+      consumedFilePath
+    } finally {
+      consumedWriter.close()
+      consumer.close()
+    }
+  }
+
+  def readString(buffer: ByteBuffer): String = {
+    Utils.utf8(buffer)
+  }
+
+}
+
+case class TestRecord(topic: String, key: Int, value: Long, delete: Boolean) {
+  override def toString = topic + "\t" + key + "\t" + value + "\t" + (if 
(delete) "d" else "u")
+  def topicAndKey = topic + key
+}
+
+object TestRecord {
+  def parse(line: String): TestRecord = {
+    val components = line.split("\t")
+    new TestRecord(components(0), components(1).toInt, components(2).toLong, 
components(3) == "d")
+  }
+}
\ No newline at end of file
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 8eee575f4a6..b0a9faacf2e 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -211,21 +211,39 @@ def set_protocol_and_port(self, node):
         self.advertised_listeners = ','.join(advertised_listeners)
 
     def prop_file(self, node):
-        cfg = KafkaConfig(**node.config)
-        cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
-        cfg[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting()
+        self.set_protocol_and_port(node)
+
+        #load template configs as dictionary
+        config_template = self.render('kafka.properties', node=node, 
broker_id=self.idx(node),
+                                 security_config=self.security_config, 
num_nodes=self.num_nodes)
+
+        configs = dict( l.rstrip().split('=') for l in 
config_template.split('\n')
+                        if not l.startswith("#") and "=" in l )
+
+        #load specific test override configs
+        override_configs = KafkaConfig(**node.config)
+        override_configs[config_property.ADVERTISED_HOSTNAME] = 
node.account.hostname
+        override_configs[config_property.ZOOKEEPER_CONNECT] = 
self.zk_connect_setting()
 
         for prop in self.server_prop_overides:
-            cfg[prop[0]] = prop[1]
+            override_configs[prop[0]] = prop[1]
 
-        self.set_protocol_and_port(node)
+        #update template configs with test override configs
+        configs.update(override_configs)
 
-        # TODO - clean up duplicate configuration logic
-        prop_file = cfg.render()
-        prop_file += self.render('kafka.properties', node=node, 
broker_id=self.idx(node),
-                                 security_config=self.security_config, 
num_nodes=self.num_nodes)
+        prop_file = self.render_configs(configs)
         return prop_file
 
+    def render_configs(self, configs):
+        """Render self as a series of lines key=val\n, and do so in a 
consistent order. """
+        keys = [k for k in configs.keys()]
+        keys.sort()
+
+        s = ""
+        for k in keys:
+            s += "%s=%s\n" % (k, str(configs[k]))
+        return s
+
     def start_cmd(self, node):
         cmd = "export JMX_PORT=%d; " % self.jmx_port
         cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % 
self.LOG4J_CONFIG
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties 
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 8cca14fa66d..dd777f9be23 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -20,18 +20,6 @@ advertised.host.name={{ node.account.hostname }}
 listeners={{ listeners }}
 advertised.listeners={{ advertised_listeners }}
 
-num.network.threads=3
-num.io.threads=8
-socket.send.buffer.bytes=102400
-socket.receive.buffer.bytes=65536
-socket.request.max.bytes=104857600
-
-num.partitions=1
-num.recovery.threads.per.data.dir=1
-log.retention.hours=168
-log.segment.bytes=1073741824
-log.cleaner.enable=false
-
 security.inter.broker.protocol={{ 
security_config.interbroker_security_protocol }}
 
 ssl.keystore.location=/mnt/security/test.keystore.jks
diff --git a/tests/kafkatest/services/log_compaction_tester.py 
b/tests/kafkatest/services/log_compaction_tester.py
new file mode 100644
index 00000000000..4a19650ff2e
--- /dev/null
+++ b/tests/kafkatest/services/log_compaction_tester.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, 
CORE_LIBS_JAR_NAME, CORE_DEPENDANT_TEST_LIBS_JAR_NAME
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.version import DEV_BRANCH
+
+class LogCompactionTester(KafkaPathResolverMixin, BackgroundThreadService):
+
+    OUTPUT_DIR = "/mnt/logcompaction_tester"
+    LOG_PATH = os.path.join(OUTPUT_DIR, "logcompaction_tester_stdout.log")
+    VERIFICATION_STRING = "Data verification is completed"
+
+    logs = {
+        "tool_logs": {
+            "path": LOG_PATH,
+            "collect_default": True}
+    }
+
+    def __init__(self, context, kafka, security_protocol="PLAINTEXT", 
stop_timeout_sec=30):
+        super(LogCompactionTester, self).__init__(context, 1)
+
+        self.kafka = kafka
+        self.security_protocol = security_protocol
+        self.security_config = SecurityConfig(self.context, security_protocol)
+        self.stop_timeout_sec = stop_timeout_sec
+        self.log_compaction_completed = False
+
+    def _worker(self, idx, node):
+        node.account.ssh("mkdir -p %s" % LogCompactionTester.OUTPUT_DIR)
+        cmd = self.start_cmd(node)
+        self.logger.info("LogCompactionTester %d command: %s" % (idx, cmd))
+        self.security_config.setup_node(node)
+        for line in node.account.ssh_capture(cmd):
+            self.logger.debug("Checking line:{}".format(line))
+
+            if line.startswith(LogCompactionTester.VERIFICATION_STRING):
+                self.log_compaction_completed = True
+
+    def start_cmd(self, node):
+        core_libs_jar = self.path.jar(CORE_LIBS_JAR_NAME, DEV_BRANCH)
+        core_dependant_test_libs_jar = 
self.path.jar(CORE_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
+
+        cmd = "for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % 
core_libs_jar
+        cmd += " for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % 
core_dependant_test_libs_jar
+        cmd += " export CLASSPATH;"
+        cmd += self.path.script("kafka-run-class.sh", node)
+        cmd += " %s" % self.java_class_name()
+        cmd += " --bootstrap-server %s --messages 1000000 --sleep 20 
--duplicates 10 --percent-deletes 10" % 
(self.kafka.bootstrap_servers(self.security_protocol))
+
+        cmd += " 2>> %s | tee -a %s &" % (self.logs["tool_logs"]["path"], 
self.logs["tool_logs"]["path"])
+        return cmd
+
+    def stop_node(self, node):
+        node.account.kill_java_processes(self.java_class_name(), 
clean_shutdown=True,
+                                         allow_fail=True)
+
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of 
%s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
+
+    def clean_node(self, node):
+        node.account.kill_java_processes(self.java_class_name(), 
clean_shutdown=False,
+                                         allow_fail=True)
+        node.account.ssh("rm -rf %s" % LogCompactionTester.OUTPUT_DIR, 
allow_fail=False)
+
+    def java_class_name(self):
+        return "kafka.tools.LogCompactionTester"
+
+    @property
+    def is_done(self):
+        return self.log_compaction_completed
diff --git a/tests/kafkatest/tests/tools/log_compaction_test.py 
b/tests/kafkatest/tests/tools/log_compaction_test.py
new file mode 100644
index 00000000000..338060f7217
--- /dev/null
+++ b/tests/kafkatest/tests/tools/log_compaction_test.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.kafka import config_property
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.log_compaction_tester import LogCompactionTester
+
+class LogCompactionTest(Test):
+
+    # Configure smaller segment size to create more segments for compaction
+    LOG_SEGMENT_BYTES = "1024000"
+
+    def __init__(self, test_context):
+        super(LogCompactionTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.kafka = None
+        self.compaction_verifier = None
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context,
+            num_nodes = self.num_brokers,
+            zk = self.zk,
+            security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol,
+            server_prop_overides=[
+                [config_property.LOG_SEGMENT_BYTES, 
LogCompactionTest.LOG_SEGMENT_BYTES],
+            ])
+        self.kafka.start()
+
+    def start_test_log_compaction_tool(self, security_protocol):
+        self.compaction_verifier = LogCompactionTester(self.test_context, 
self.kafka, security_protocol=security_protocol)
+        self.compaction_verifier.start()
+
+    @cluster(num_nodes=4)
+    def test_log_compaction(self, security_protocol='PLAINTEXT'):
+
+        self.start_kafka(security_protocol, security_protocol)
+        self.start_test_log_compaction_tool(security_protocol)
+
+        # Verify that compacted data verification completed in 
LogCompactionTester
+        wait_until(lambda: self.compaction_verifier.is_done, timeout_sec=180, 
err_msg="Timed out waiting to complete compaction")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add system test for log compaction
> ----------------------------------
>
>                 Key: KAFKA-7210
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7210
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Manikumar
>            Assignee: Manikumar
>            Priority: Major
>             Fix For: 2.1.0
>
>
> Currently we have TestLogCleaning tool for stress test log compaction. ThisĀ 
> JIRA is to integrate the tool to system test.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to