This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 12eca09fb46 KAFKA-20178 Move LogCleanerIntegrationTest from core to 
storage (#21575)
12eca09fb46 is described below

commit 12eca09fb4615154b91bb84afdef768aef2866ac
Author: Maros Orsak <[email protected]>
AuthorDate: Thu Mar 5 15:04:09 2026 +0100

    KAFKA-20178 Move LogCleanerIntegrationTest from core to storage (#21575)
    
    This PR is a follow-up PR from
    https://github.com/apache/kafka/pull/21406. The last one should be
    `LogCleanerParameterizedIntegrationTest` and then we can simply remove
    Abstract scala class also.
    
    I have done also a bit of refactor some local variables which were used
    in multiple tests i.e., MAX_COMPACTION_LAG, MIN_CLEANABLE_DIRTY_RATIO
    etc. but mostly it's 1:1 port (where I tried to merge it into existing
    class as previously approved).
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../unit/kafka/log/LogCleanerIntegrationTest.scala | 235 -------------------
 .../log/LogCleanerLagIntegrationTest.java          | 255 +++++++++++++++++++++
 2 files changed, 255 insertions(+), 235 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
deleted file mode 100644
index b3f00af2b86..00000000000
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.log
-
-import java.io.PrintWriter
-import com.yammer.metrics.core.{Gauge, MetricName}
-import kafka.utils.TestUtils
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.record.internal.RecordBatch
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{LogCleanerManager, UnifiedLog}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Test}
-
-import scala.collection.{Iterable, Seq}
-import scala.jdk.CollectionConverters._
-
-/**
-  * This is an integration test that tests the fully integrated log cleaner
-  */
-class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
-
-  val codec: Compression = Compression.lz4().build()
-
-  val time = new MockTime()
-  val topicPartitions = Array(new TopicPartition("log", 0), new 
TopicPartition("log", 1), new TopicPartition("log", 2))
-
-  @AfterEach
-  def cleanup(): Unit = {
-    TestUtils.clearYammerMetrics()
-  }
-
-  @Test
-  def testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics(): Unit = {
-    val largeMessageKey = 20
-    val (_, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, 
RecordBatch.CURRENT_MAGIC_VALUE, codec)
-    val maxMessageSize = largeMessageSet.sizeInBytes
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = 
maxMessageSize, backoffMs = 100)
-
-    def breakPartitionLog(tp: TopicPartition): Unit = {
-      val log = cleaner.logs.get(tp)
-      writeDups(numKeys = 20, numDups = 3, log = log, codec = codec)
-
-      val partitionFile = log.logSegments.asScala.last.log.file()
-      val writer = new PrintWriter(partitionFile)
-      writer.write("jogeajgoea")
-      writer.close()
-
-      writeDups(numKeys = 20, numDups = 3, log = log, codec = codec)
-    }
-
-    breakPartitionLog(topicPartitions(0))
-    breakPartitionLog(topicPartitions(1))
-
-    cleaner.startup()
-
-    val log = cleaner.logs.get(topicPartitions(0))
-    val log2 = cleaner.logs.get(topicPartitions(1))
-    val uncleanableDirectory = log.dir.getParent
-    val uncleanablePartitionsCountGauge = 
getGauge[Int]("uncleanable-partitions-count", uncleanableDirectory)
-    val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes", 
uncleanableDirectory)
-
-    TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() == 
2, "There should be 2 uncleanable partitions", 2000L)
-    val expectedTotalUncleanableBytes = 
LogCleanerManager.calculateCleanableBytes(log, 0, 
log.logSegments.asScala.last.baseOffset).getValue +
-      LogCleanerManager.calculateCleanableBytes(log2, 0, 
log2.logSegments.asScala.last.baseOffset).getValue
-    TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() == 
expectedTotalUncleanableBytes,
-      s"There should be $expectedTotalUncleanableBytes uncleanable bytes", 
1000L)
-
-    val uncleanablePartitions = 
cleaner.cleanerManager.uncleanablePartitions(uncleanableDirectory)
-    assertTrue(uncleanablePartitions.contains(topicPartitions(0)))
-    assertTrue(uncleanablePartitions.contains(topicPartitions(1)))
-    assertFalse(uncleanablePartitions.contains(topicPartitions(2)))
-
-    // Delete one partition
-    cleaner.logs.remove(topicPartitions(0))
-    TestUtils.waitUntilTrue(
-      () => {
-        time.sleep(1000)
-        uncleanablePartitionsCountGauge.value() == 1
-      },
-      "There should be 1 uncleanable partitions",
-      2000L)
-
-    val uncleanablePartitions2 = 
cleaner.cleanerManager.uncleanablePartitions(uncleanableDirectory)
-    assertFalse(uncleanablePartitions2.contains(topicPartitions(0)))
-    assertTrue(uncleanablePartitions2.contains(topicPartitions(1)))
-    assertFalse(uncleanablePartitions2.contains(topicPartitions(2)))
-  }
-
-  private def getGauge[T](filter: MetricName => Boolean): Gauge[T] = {
-    KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
-      .filter { case (k, _) => filter(k) }
-      .headOption
-      .getOrElse { fail(s"Unable to find metric") }
-      .asInstanceOf[(Any, Gauge[Any])]
-      ._2
-      .asInstanceOf[Gauge[T]]
-  }
-
-  private def getGauge[T](metricName: String): Gauge[T] = {
-    getGauge(mName => mName.getName.endsWith(metricName) && mName.getScope == 
null)
-  }
-
-  private def getGauge[T](metricName: String, metricScope: String): Gauge[T] = 
{
-    getGauge(k => k.getName.endsWith(metricName) && 
k.getScope.endsWith(metricScope))
-  }
-
-  @Test
-  def testMaxLogCompactionLag(): Unit = {
-    val msPerHour = 60 * 60 * 1000
-
-    val minCompactionLagMs = 1 * msPerHour
-    val maxCompactionLagMs = 6 * msPerHour
-
-    val cleanerBackOffMs = 200L
-    val segmentSize = 512
-    val topicPartitions = Array(new TopicPartition("log", 0), new 
TopicPartition("log", 1), new TopicPartition("log", 2))
-    val minCleanableDirtyRatio = 1.0F
-
-    cleaner = makeCleaner(partitions = topicPartitions,
-      backoffMs = cleanerBackOffMs,
-      minCompactionLagMs = minCompactionLagMs,
-      segmentSize = segmentSize,
-      maxCompactionLagMs= maxCompactionLagMs,
-      minCleanableDirtyRatio = minCleanableDirtyRatio)
-    val log = cleaner.logs.get(topicPartitions(0))
-
-    val T0 = time.milliseconds
-    writeKeyDups(numKeys = 100, numDups = 3, log, Compression.NONE, timestamp 
= T0, startValue = 0, step = 1)
-
-    val startSizeBlock0 = log.size
-
-    val activeSegAtT0 = log.activeSegment
-
-    cleaner.startup()
-
-    // advance to a time still less than maxCompactionLagMs from start
-    time.sleep(maxCompactionLagMs/2)
-    Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to 
_not_ clean
-    assertEquals(startSizeBlock0, log.size, "There should be no cleaning until 
the max compaction lag has passed")
-
-    // advance to time a bit more than one maxCompactionLagMs from start
-    time.sleep(maxCompactionLagMs/2 + 1)
-    val T1 = time.milliseconds
-
-    // write the second block of data: all zero keys
-    val appends1 = writeKeyDups(numKeys = 100, numDups = 1, log, 
Compression.NONE, timestamp = T1, startValue = 0, step = 0)
-
-    // roll the active segment
-    log.roll()
-    val activeSegAtT1 = log.activeSegment
-    val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset
-
-    // the first block should get cleaned
-    cleaner.awaitCleaned(new TopicPartition("log", 0), 
firstBlockCleanableSegmentOffset, 60000L)
-
-    val read1 = readFromLog(log)
-    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(new 
TopicPartition("log", 0))
-    assertTrue(lastCleaned >= firstBlockCleanableSegmentOffset,
-      s"log cleaner should have processed at least to offset 
$firstBlockCleanableSegmentOffset, but lastCleaned=$lastCleaned")
-
-    //minCleanableDirtyRatio  will prevent second block of data from compacting
-    assertNotEquals(appends1, read1, s"log should still contain non-zero keys")
-
-    time.sleep(maxCompactionLagMs + 1)
-    // the second block should get cleaned. only zero keys left
-    cleaner.awaitCleaned(new TopicPartition("log", 0), 
activeSegAtT1.baseOffset, 60000L)
-
-    val read2 = readFromLog(log)
-
-    assertEquals(appends1, read2, s"log should only contains zero keys now")
-
-    val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(new 
TopicPartition("log", 0))
-    val secondBlockCleanableSegmentOffset = activeSegAtT1.baseOffset
-    assertTrue(lastCleaned2 >= secondBlockCleanableSegmentOffset,
-      s"log cleaner should have processed at least to offset 
$secondBlockCleanableSegmentOffset, but lastCleaned=$lastCleaned2")
-  }
-
-  private def readFromLog(log: UnifiedLog): Iterable[(Int, Int)] = {
-    for (segment <- log.logSegments.asScala; record <- 
segment.log.records.asScala) yield {
-      val key = TestUtils.readString(record.key).toInt
-      val value = TestUtils.readString(record.value).toInt
-      key -> value
-    }
-  }
-
-  private def writeKeyDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: 
Compression, timestamp: Long,
-                           startValue: Int, step: Int): Seq[(Int, Int)] = {
-    var valCounter = startValue
-    for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
-      val curValue = valCounter
-      log.appendAsLeader(TestUtils.singletonRecords(value = 
curValue.toString.getBytes, codec = codec,
-        key = key.toString.getBytes, timestamp = timestamp), 0)
-      // move LSO forward to increase compaction bound
-      log.updateHighWatermark(log.logEndOffset)
-      valCounter += step
-      (key, curValue)
-    }
-  }
-
-  @Test
-  def testIsThreadFailed(): Unit = {
-    val metricName = "DeadThreadCount"
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = 
100000, backoffMs = 100)
-    cleaner.startup()
-    assertEquals(0, cleaner.deadThreadCount)
-    // we simulate the unexpected error with an interrupt
-    cleaner.cleaners.forEach(_.interrupt())
-    // wait until interruption is propagated to all the threads
-    TestUtils.waitUntilTrue(
-      () => cleaner.cleaners.asScala.foldLeft(true)((result, thread) => {
-        thread.isThreadFailed && result
-      }), "Threads didn't terminate unexpectedly"
-    )
-    assertEquals(cleaner.cleaners.size, getGauge[Int](metricName).value())
-    assertEquals(cleaner.cleaners.size, cleaner.deadThreadCount)
-  }
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
index a444bbc9e95..0195017bb2d 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
@@ -26,11 +26,18 @@ import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.record.internal.RecordVersion;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.ShutdownableThread;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 import org.apache.kafka.test.TestUtils;
 
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.slf4j.Logger;
@@ -38,6 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.nio.file.Files;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -47,10 +55,13 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -69,10 +80,14 @@ public class LogCleanerLagIntegrationTest {
     private static final long DEFAULT_MIN_COMPACTION_LAG_MS = 0L;
     private static final long DEFAULT_MAX_COMPACTION_LAG_MS = Long.MAX_VALUE;
     private static final long MIN_COMPACTION_LAG = 
Duration.ofHours(1).toMillis();
+    private static final long MAX_COMPACTION_LAG = 
Duration.ofHours(6).toMillis();
     private static final long CLEANER_BACKOFF_MS = 200L;
     private static final float DEFAULT_MIN_CLEANABLE_DIRTY_RATIO = 0.0F;
+    private static final float MIN_CLEANABLE_DIRTY_RATIO = 1.0F;
     private static final int SEGMENT_SIZE = 512;
 
+    private final Compression codec = Compression.lz4().build();
+
     private int counter = 0;
 
     private final MockTime time = new MockTime(1400000000000L, 1000L);  // Tue 
May 13 16:53:20 UTC 2014
@@ -142,6 +157,175 @@ public class LogCleanerLagIntegrationTest {
                 sizeUpToActiveSegmentAtT0, compactedSize));
     }
 
+    @Test
+    public void testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics() 
throws Exception {
+        int largeMessageKey = 20;
+        ValueAndRecords largeMessage = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE, 
codec);
+        int maxMessageSize = largeMessage.records().sizeInBytes();
+        cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 100L);
+
+        breakPartitionLog(TOPIC_PARTITIONS.get(0));
+        breakPartitionLog(TOPIC_PARTITIONS.get(1));
+
+        cleaner.startup();
+
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+        UnifiedLog theLog2 = cleaner.logs().get(TOPIC_PARTITIONS.get(1));
+        String uncleanableDirectory = theLog.dir().getParent();
+        Gauge<Integer> uncleanablePartitionsCountGauge = 
getGauge("uncleanable-partitions-count", uncleanableDirectory);
+        Gauge<Long> uncleanableBytesGauge = getGauge("uncleanable-bytes", 
uncleanableDirectory);
+
+        TestUtils.waitForCondition(
+            () -> uncleanablePartitionsCountGauge.value() == 2,
+            2000L,
+            "There should be 2 uncleanable partitions");
+
+        List<LogSegment> logSegments = theLog.logSegments();
+        LogSegment lastLogSegment = logSegments.get(logSegments.size() - 1);
+        List<LogSegment> log2Segments = theLog2.logSegments();
+        LogSegment lastLog2Segment = log2Segments.get(log2Segments.size() - 1);
+
+        long expectedTotalUncleanableBytes =
+            LogCleanerManager.calculateCleanableBytes(theLog, 0, 
lastLogSegment.baseOffset()).getValue() +
+            LogCleanerManager.calculateCleanableBytes(theLog2, 0, 
lastLog2Segment.baseOffset()).getValue();
+        TestUtils.waitForCondition(
+            () -> uncleanableBytesGauge.value() == 
expectedTotalUncleanableBytes,
+            1000L,
+            "There should be " + expectedTotalUncleanableBytes + " uncleanable 
bytes");
+
+        Set<TopicPartition> uncleanablePartitions = 
cleaner.cleanerManager().uncleanablePartitions(uncleanableDirectory);
+        assertTrue(uncleanablePartitions.contains(TOPIC_PARTITIONS.get(0)));
+        assertTrue(uncleanablePartitions.contains(TOPIC_PARTITIONS.get(1)));
+        assertFalse(uncleanablePartitions.contains(TOPIC_PARTITIONS.get(2)));
+
+        // Delete one partition
+        cleaner.logs().remove(TOPIC_PARTITIONS.get(0));
+        TestUtils.waitForCondition(
+            () -> {
+                time.sleep(1000);
+                return uncleanablePartitionsCountGauge.value() == 1;
+            },
+            2000L,
+            "There should be 1 uncleanable partition");
+
+        Set<TopicPartition> uncleanablePartitions2 = 
cleaner.cleanerManager().uncleanablePartitions(uncleanableDirectory);
+        assertFalse(uncleanablePartitions2.contains(TOPIC_PARTITIONS.get(0)));
+        assertTrue(uncleanablePartitions2.contains(TOPIC_PARTITIONS.get(1)));
+        assertFalse(uncleanablePartitions2.contains(TOPIC_PARTITIONS.get(2)));
+    }
+
+    @Test
+    public void testMaxLogCompactionLag() throws Exception {
+        cleaner = makeCleaner(TOPIC_PARTITIONS, CLEANER_BACKOFF_MS, 
MIN_COMPACTION_LAG, SEGMENT_SIZE,
+            MAX_COMPACTION_LAG, MIN_CLEANABLE_DIRTY_RATIO);
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+        long t0 = time.milliseconds();
+        writeKeyDups(100, 3, theLog, Compression.NONE, t0, 0, 1);
+
+        long startSizeBlock0 = theLog.size();
+
+        LogSegment activeSegAtT0 = theLog.activeSegment();
+
+        cleaner.startup();
+
+        // advance to a time still less than MAX_COMPACTION_LAG from start
+        time.sleep(MAX_COMPACTION_LAG / 2);
+        Thread.sleep(5 * CLEANER_BACKOFF_MS); // give cleaning thread a chance 
to _not_ clean
+        assertEquals(startSizeBlock0, theLog.size(), "There should be no 
cleaning until the max compaction lag has passed");
+
+        // advance to time a bit more than one MAX_COMPACTION_LAG from start
+        time.sleep(MAX_COMPACTION_LAG / 2 + 1);
+        long t1 = time.milliseconds();
+
+        // write the second block of data: all zero keys
+        List<int[]> appends1 = writeKeyDups(100, 1, theLog, Compression.NONE, 
t1, 0, 0);
+
+        // roll the active segment
+        theLog.roll();
+        LogSegment activeSegAtT1 = theLog.activeSegment();
+        long firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset();
+
+        // the first block should get cleaned
+        cleaner.awaitCleaned(new TopicPartition("log", 0), 
firstBlockCleanableSegmentOffset, 60000L);
+
+        List<int[]> read1 = readKeyValuePairsFromLog(theLog);
+        Long lastCleaned = 
cleaner.cleanerManager().allCleanerCheckpoints().get(new TopicPartition("log", 
0));
+        assertTrue(lastCleaned >= firstBlockCleanableSegmentOffset,
+            "log cleaner should have processed at least to offset " + 
firstBlockCleanableSegmentOffset + ", but lastCleaned=" + lastCleaned);
+
+        // minCleanableDirtyRatio will prevent second block of data from 
compacting
+        assertNotEquals(appends1.size(), read1.size(), "log should still 
contain non-zero keys");
+
+        time.sleep(MAX_COMPACTION_LAG + 1);
+        // the second block should get cleaned. only zero keys left
+        cleaner.awaitCleaned(new TopicPartition("log", 0), 
activeSegAtT1.baseOffset(), 60000L);
+
+        List<int[]> read2 = readKeyValuePairsFromLog(theLog);
+
+        assertEquals(appends1.size(), read2.size(), "log should only contain 
zero keys now");
+        for (int i = 0; i < appends1.size(); i++) {
+            assertEquals(appends1.get(i)[0], read2.get(i)[0], "key mismatch at 
index " + i);
+            assertEquals(appends1.get(i)[1], read2.get(i)[1], "value mismatch 
at index " + i);
+        }
+
+        Long lastCleaned2 = 
cleaner.cleanerManager().allCleanerCheckpoints().get(new TopicPartition("log", 
0));
+        long secondBlockCleanableSegmentOffset = activeSegAtT1.baseOffset();
+        assertTrue(lastCleaned2 >= secondBlockCleanableSegmentOffset,
+            "log cleaner should have processed at least to offset " + 
secondBlockCleanableSegmentOffset + ", but lastCleaned=" + lastCleaned2);
+    }
+
+    @Test
+    public void testIsThreadFailed() throws Exception {
+        cleaner = makeCleaner(TOPIC_PARTITIONS, 100000, 100L);
+        cleaner.startup();
+        assertEquals(0, cleaner.deadThreadCount());
+        // we simulate the unexpected error with an interrupt
+        cleaner.cleaners().forEach(Thread::interrupt);
+        // wait until interruption is propagated to all the threads
+        TestUtils.waitForCondition(
+            () -> 
cleaner.cleaners().stream().allMatch(ShutdownableThread::isThreadFailed),
+            "Threads didn't terminate unexpectedly");
+        assertEquals(cleaner.cleaners().size(), 
getGauge("DeadThreadCount").value());
+        assertEquals(cleaner.cleaners().size(), cleaner.deadThreadCount());
+    }
+
+    private void breakPartitionLog(TopicPartition tp) throws IOException {
+        UnifiedLog theLog = cleaner.logs().get(tp);
+        writeDups(20, 3, theLog, codec);
+
+        List<LogSegment> segments = theLog.logSegments();
+        LogSegment lastSegment = segments.get(segments.size() - 1);
+        File partitionFile = lastSegment.log().file();
+        try (PrintWriter writer = new PrintWriter(partitionFile)) {
+            writer.write("jogeajgoea");
+        }
+
+        writeDups(20, 3, theLog, codec);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> Gauge<T> getGauge(String metricName) {
+        for (Map.Entry<MetricName, Metric> entry : 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet()) {
+            MetricName name = entry.getKey();
+            if (name.getName().endsWith(metricName) && name.getScope() == 
null) {
+                return (Gauge<T>) entry.getValue();
+            }
+        }
+        throw new AssertionError("Unable to find metric: " + metricName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> Gauge<T> getGauge(String metricName, String metricScope) {
+        for (Map.Entry<MetricName, Metric> entry : 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet()) {
+            MetricName name = entry.getKey();
+            if (name.getName().endsWith(metricName) && name.getScope() != null 
&& name.getScope().endsWith(metricScope)) {
+                return (Gauge<T>) entry.getValue();
+            }
+        }
+        throw new AssertionError("Unable to find metric: " + metricName + " 
with scope ending in " + metricScope);
+    }
+
     private long calculateSizeUpToOffset(UnifiedLog log, long offset) {
         long size = 0;
         for (LogSegment segment : log.logSegments(0L, offset)) {
@@ -162,6 +346,41 @@ public class LogCleanerLagIntegrationTest {
         return result;
     }
 
+    private List<int[]> readKeyValuePairsFromLog(UnifiedLog log) {
+        List<int[]> result = new ArrayList<>();
+        for (LogSegment segment : log.logSegments()) {
+            for (Record record : segment.log().records()) {
+                int key = 
Integer.parseInt(LogTestUtils.readString(record.key()));
+                int value = 
Integer.parseInt(LogTestUtils.readString(record.value()));
+                result.add(new int[]{key, value});
+            }
+        }
+        return result;
+    }
+
+    private List<int[]> writeKeyDups(int numKeys, int numDups, UnifiedLog log, 
Compression codec,
+                                     long timestamp, int startValue, int step) 
throws IOException {
+        List<int[]> result = new ArrayList<>();
+        int valCounter = startValue;
+        for (int i = 0; i < numDups; i++) {
+            for (int key = 0; key < numKeys; key++) {
+                int curValue = valCounter;
+                log.appendAsLeader(
+                    LogTestUtils.singletonRecords(
+                        String.valueOf(curValue).getBytes(),
+                        codec,
+                        String.valueOf(key).getBytes(),
+                        timestamp),
+                    0);
+                // move LSO forward to increase compaction bound
+                log.updateHighWatermark(log.logEndOffset());
+                valCounter += step;
+                result.add(new int[]{key, curValue});
+            }
+        }
+        return result;
+    }
+
     private Map<Integer, Integer> writeDupsWithTimestamp(int numKeys, int 
numDups, UnifiedLog log,
                                                           Compression codec, 
long timestamp) throws IOException {
         Map<Integer, Integer> result = new HashMap<>();
@@ -289,6 +508,41 @@ public class LogCleanerLagIntegrationTest {
             new Properties());
     }
 
+    private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+                                   int maxMessageSize,
+                                   long backoffMs) throws IOException {
+        return makeCleaner(partitions,
+            DEFAULT_MIN_CLEANABLE_DIRTY_RATIO,
+            1,
+            backoffMs,
+            maxMessageSize,
+            DEFAULT_MIN_COMPACTION_LAG_MS,
+            DEFAULT_DELETE_DELAY,
+            DEFAULT_SEGMENT_SIZE,
+            DEFAULT_MAX_COMPACTION_LAG_MS,
+            null,
+            new Properties());
+    }
+
+    private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+                                   long backoffMs,
+                                   long minCompactionLagMs,
+                                   int segmentSize,
+                                   long maxCompactionLagMs,
+                                   float minCleanableDirtyRatio) throws 
IOException {
+        return makeCleaner(partitions,
+            minCleanableDirtyRatio,
+            1,
+            backoffMs,
+            DEFAULT_MAX_MESSAGE_SIZE,
+            minCompactionLagMs,
+            DEFAULT_DELETE_DELAY,
+            segmentSize,
+            maxCompactionLagMs,
+            null,
+            new Properties());
+    }
+
     private int counter() {
         return counter;
     }
@@ -347,6 +601,7 @@ public class LogCleanerLagIntegrationTest {
 
     @AfterEach
     public void teardown() throws IOException, InterruptedException {
+        kafka.utils.TestUtils.clearYammerMetrics();
         if (cleaner != null) {
             cleaner.shutdown();
         }

Reply via email to