This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 12eca09fb46 KAFKA-20178 Move LogCleanerIntegrationTest from core to
storage (#21575)
12eca09fb46 is described below
commit 12eca09fb4615154b91bb84afdef768aef2866ac
Author: Maros Orsak <[email protected]>
AuthorDate: Thu Mar 5 15:04:09 2026 +0100
KAFKA-20178 Move LogCleanerIntegrationTest from core to storage (#21575)
This PR is a follow-up PR from
https://github.com/apache/kafka/pull/21406. The last one should be
`LogCleanerParameterizedIntegrationTest` and then we can simply remove
Abstract scala class also.
I have done also a bit of refactor some local variables which were used
in multiple tests i.e., MAX_COMPACTION_LAG, MIN_CLEANABLE_DIRTY_RATIO
etc. but mostly it's 1:1 port (where I tried to merge it into existing
class as previously approved).
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../unit/kafka/log/LogCleanerIntegrationTest.scala | 235 -------------------
.../log/LogCleanerLagIntegrationTest.java | 255 +++++++++++++++++++++
2 files changed, 255 insertions(+), 235 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
deleted file mode 100644
index b3f00af2b86..00000000000
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io.PrintWriter
-import com.yammer.metrics.core.{Gauge, MetricName}
-import kafka.utils.TestUtils
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.record.internal.RecordBatch
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{LogCleanerManager, UnifiedLog}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Test}
-
-import scala.collection.{Iterable, Seq}
-import scala.jdk.CollectionConverters._
-
-/**
- * This is an integration test that tests the fully integrated log cleaner
- */
-class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
-
- val codec: Compression = Compression.lz4().build()
-
- val time = new MockTime()
- val topicPartitions = Array(new TopicPartition("log", 0), new
TopicPartition("log", 1), new TopicPartition("log", 2))
-
- @AfterEach
- def cleanup(): Unit = {
- TestUtils.clearYammerMetrics()
- }
-
- @Test
- def testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics(): Unit = {
- val largeMessageKey = 20
- val (_, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey,
RecordBatch.CURRENT_MAGIC_VALUE, codec)
- val maxMessageSize = largeMessageSet.sizeInBytes
- cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize =
maxMessageSize, backoffMs = 100)
-
- def breakPartitionLog(tp: TopicPartition): Unit = {
- val log = cleaner.logs.get(tp)
- writeDups(numKeys = 20, numDups = 3, log = log, codec = codec)
-
- val partitionFile = log.logSegments.asScala.last.log.file()
- val writer = new PrintWriter(partitionFile)
- writer.write("jogeajgoea")
- writer.close()
-
- writeDups(numKeys = 20, numDups = 3, log = log, codec = codec)
- }
-
- breakPartitionLog(topicPartitions(0))
- breakPartitionLog(topicPartitions(1))
-
- cleaner.startup()
-
- val log = cleaner.logs.get(topicPartitions(0))
- val log2 = cleaner.logs.get(topicPartitions(1))
- val uncleanableDirectory = log.dir.getParent
- val uncleanablePartitionsCountGauge =
getGauge[Int]("uncleanable-partitions-count", uncleanableDirectory)
- val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes",
uncleanableDirectory)
-
- TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() ==
2, "There should be 2 uncleanable partitions", 2000L)
- val expectedTotalUncleanableBytes =
LogCleanerManager.calculateCleanableBytes(log, 0,
log.logSegments.asScala.last.baseOffset).getValue +
- LogCleanerManager.calculateCleanableBytes(log2, 0,
log2.logSegments.asScala.last.baseOffset).getValue
- TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() ==
expectedTotalUncleanableBytes,
- s"There should be $expectedTotalUncleanableBytes uncleanable bytes",
1000L)
-
- val uncleanablePartitions =
cleaner.cleanerManager.uncleanablePartitions(uncleanableDirectory)
- assertTrue(uncleanablePartitions.contains(topicPartitions(0)))
- assertTrue(uncleanablePartitions.contains(topicPartitions(1)))
- assertFalse(uncleanablePartitions.contains(topicPartitions(2)))
-
- // Delete one partition
- cleaner.logs.remove(topicPartitions(0))
- TestUtils.waitUntilTrue(
- () => {
- time.sleep(1000)
- uncleanablePartitionsCountGauge.value() == 1
- },
- "There should be 1 uncleanable partitions",
- 2000L)
-
- val uncleanablePartitions2 =
cleaner.cleanerManager.uncleanablePartitions(uncleanableDirectory)
- assertFalse(uncleanablePartitions2.contains(topicPartitions(0)))
- assertTrue(uncleanablePartitions2.contains(topicPartitions(1)))
- assertFalse(uncleanablePartitions2.contains(topicPartitions(2)))
- }
-
- private def getGauge[T](filter: MetricName => Boolean): Gauge[T] = {
- KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
- .filter { case (k, _) => filter(k) }
- .headOption
- .getOrElse { fail(s"Unable to find metric") }
- .asInstanceOf[(Any, Gauge[Any])]
- ._2
- .asInstanceOf[Gauge[T]]
- }
-
- private def getGauge[T](metricName: String): Gauge[T] = {
- getGauge(mName => mName.getName.endsWith(metricName) && mName.getScope ==
null)
- }
-
- private def getGauge[T](metricName: String, metricScope: String): Gauge[T] =
{
- getGauge(k => k.getName.endsWith(metricName) &&
k.getScope.endsWith(metricScope))
- }
-
- @Test
- def testMaxLogCompactionLag(): Unit = {
- val msPerHour = 60 * 60 * 1000
-
- val minCompactionLagMs = 1 * msPerHour
- val maxCompactionLagMs = 6 * msPerHour
-
- val cleanerBackOffMs = 200L
- val segmentSize = 512
- val topicPartitions = Array(new TopicPartition("log", 0), new
TopicPartition("log", 1), new TopicPartition("log", 2))
- val minCleanableDirtyRatio = 1.0F
-
- cleaner = makeCleaner(partitions = topicPartitions,
- backoffMs = cleanerBackOffMs,
- minCompactionLagMs = minCompactionLagMs,
- segmentSize = segmentSize,
- maxCompactionLagMs= maxCompactionLagMs,
- minCleanableDirtyRatio = minCleanableDirtyRatio)
- val log = cleaner.logs.get(topicPartitions(0))
-
- val T0 = time.milliseconds
- writeKeyDups(numKeys = 100, numDups = 3, log, Compression.NONE, timestamp
= T0, startValue = 0, step = 1)
-
- val startSizeBlock0 = log.size
-
- val activeSegAtT0 = log.activeSegment
-
- cleaner.startup()
-
- // advance to a time still less than maxCompactionLagMs from start
- time.sleep(maxCompactionLagMs/2)
- Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to
_not_ clean
- assertEquals(startSizeBlock0, log.size, "There should be no cleaning until
the max compaction lag has passed")
-
- // advance to time a bit more than one maxCompactionLagMs from start
- time.sleep(maxCompactionLagMs/2 + 1)
- val T1 = time.milliseconds
-
- // write the second block of data: all zero keys
- val appends1 = writeKeyDups(numKeys = 100, numDups = 1, log,
Compression.NONE, timestamp = T1, startValue = 0, step = 0)
-
- // roll the active segment
- log.roll()
- val activeSegAtT1 = log.activeSegment
- val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset
-
- // the first block should get cleaned
- cleaner.awaitCleaned(new TopicPartition("log", 0),
firstBlockCleanableSegmentOffset, 60000L)
-
- val read1 = readFromLog(log)
- val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(new
TopicPartition("log", 0))
- assertTrue(lastCleaned >= firstBlockCleanableSegmentOffset,
- s"log cleaner should have processed at least to offset
$firstBlockCleanableSegmentOffset, but lastCleaned=$lastCleaned")
-
- //minCleanableDirtyRatio will prevent second block of data from compacting
- assertNotEquals(appends1, read1, s"log should still contain non-zero keys")
-
- time.sleep(maxCompactionLagMs + 1)
- // the second block should get cleaned. only zero keys left
- cleaner.awaitCleaned(new TopicPartition("log", 0),
activeSegAtT1.baseOffset, 60000L)
-
- val read2 = readFromLog(log)
-
- assertEquals(appends1, read2, s"log should only contains zero keys now")
-
- val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(new
TopicPartition("log", 0))
- val secondBlockCleanableSegmentOffset = activeSegAtT1.baseOffset
- assertTrue(lastCleaned2 >= secondBlockCleanableSegmentOffset,
- s"log cleaner should have processed at least to offset
$secondBlockCleanableSegmentOffset, but lastCleaned=$lastCleaned2")
- }
-
- private def readFromLog(log: UnifiedLog): Iterable[(Int, Int)] = {
- for (segment <- log.logSegments.asScala; record <-
segment.log.records.asScala) yield {
- val key = TestUtils.readString(record.key).toInt
- val value = TestUtils.readString(record.value).toInt
- key -> value
- }
- }
-
- private def writeKeyDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec:
Compression, timestamp: Long,
- startValue: Int, step: Int): Seq[(Int, Int)] = {
- var valCounter = startValue
- for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
- val curValue = valCounter
- log.appendAsLeader(TestUtils.singletonRecords(value =
curValue.toString.getBytes, codec = codec,
- key = key.toString.getBytes, timestamp = timestamp), 0)
- // move LSO forward to increase compaction bound
- log.updateHighWatermark(log.logEndOffset)
- valCounter += step
- (key, curValue)
- }
- }
-
- @Test
- def testIsThreadFailed(): Unit = {
- val metricName = "DeadThreadCount"
- cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize =
100000, backoffMs = 100)
- cleaner.startup()
- assertEquals(0, cleaner.deadThreadCount)
- // we simulate the unexpected error with an interrupt
- cleaner.cleaners.forEach(_.interrupt())
- // wait until interruption is propagated to all the threads
- TestUtils.waitUntilTrue(
- () => cleaner.cleaners.asScala.foldLeft(true)((result, thread) => {
- thread.isThreadFailed && result
- }), "Threads didn't terminate unexpectedly"
- )
- assertEquals(cleaner.cleaners.size, getGauge[Int](metricName).value())
- assertEquals(cleaner.cleaners.size, cleaner.deadThreadCount)
- }
-}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
index a444bbc9e95..0195017bb2d 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
@@ -26,11 +26,18 @@ import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.record.internal.RecordVersion;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.ShutdownableThread;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
@@ -38,6 +45,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
@@ -47,10 +55,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -69,10 +80,14 @@ public class LogCleanerLagIntegrationTest {
private static final long DEFAULT_MIN_COMPACTION_LAG_MS = 0L;
private static final long DEFAULT_MAX_COMPACTION_LAG_MS = Long.MAX_VALUE;
private static final long MIN_COMPACTION_LAG =
Duration.ofHours(1).toMillis();
+ private static final long MAX_COMPACTION_LAG =
Duration.ofHours(6).toMillis();
private static final long CLEANER_BACKOFF_MS = 200L;
private static final float DEFAULT_MIN_CLEANABLE_DIRTY_RATIO = 0.0F;
+ private static final float MIN_CLEANABLE_DIRTY_RATIO = 1.0F;
private static final int SEGMENT_SIZE = 512;
+ private final Compression codec = Compression.lz4().build();
+
private int counter = 0;
private final MockTime time = new MockTime(1400000000000L, 1000L); // Tue
May 13 16:53:20 UTC 2014
@@ -142,6 +157,175 @@ public class LogCleanerLagIntegrationTest {
sizeUpToActiveSegmentAtT0, compactedSize));
}
+ @Test
+ public void testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics()
throws Exception {
+ int largeMessageKey = 20;
+ ValueAndRecords largeMessage =
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE,
codec);
+ int maxMessageSize = largeMessage.records().sizeInBytes();
+ cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 100L);
+
+ breakPartitionLog(TOPIC_PARTITIONS.get(0));
+ breakPartitionLog(TOPIC_PARTITIONS.get(1));
+
+ cleaner.startup();
+
+ UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+ UnifiedLog theLog2 = cleaner.logs().get(TOPIC_PARTITIONS.get(1));
+ String uncleanableDirectory = theLog.dir().getParent();
+ Gauge<Integer> uncleanablePartitionsCountGauge =
getGauge("uncleanable-partitions-count", uncleanableDirectory);
+ Gauge<Long> uncleanableBytesGauge = getGauge("uncleanable-bytes",
uncleanableDirectory);
+
+ TestUtils.waitForCondition(
+ () -> uncleanablePartitionsCountGauge.value() == 2,
+ 2000L,
+ "There should be 2 uncleanable partitions");
+
+ List<LogSegment> logSegments = theLog.logSegments();
+ LogSegment lastLogSegment = logSegments.get(logSegments.size() - 1);
+ List<LogSegment> log2Segments = theLog2.logSegments();
+ LogSegment lastLog2Segment = log2Segments.get(log2Segments.size() - 1);
+
+ long expectedTotalUncleanableBytes =
+ LogCleanerManager.calculateCleanableBytes(theLog, 0,
lastLogSegment.baseOffset()).getValue() +
+ LogCleanerManager.calculateCleanableBytes(theLog2, 0,
lastLog2Segment.baseOffset()).getValue();
+ TestUtils.waitForCondition(
+ () -> uncleanableBytesGauge.value() ==
expectedTotalUncleanableBytes,
+ 1000L,
+ "There should be " + expectedTotalUncleanableBytes + " uncleanable
bytes");
+
+ Set<TopicPartition> uncleanablePartitions =
cleaner.cleanerManager().uncleanablePartitions(uncleanableDirectory);
+ assertTrue(uncleanablePartitions.contains(TOPIC_PARTITIONS.get(0)));
+ assertTrue(uncleanablePartitions.contains(TOPIC_PARTITIONS.get(1)));
+ assertFalse(uncleanablePartitions.contains(TOPIC_PARTITIONS.get(2)));
+
+ // Delete one partition
+ cleaner.logs().remove(TOPIC_PARTITIONS.get(0));
+ TestUtils.waitForCondition(
+ () -> {
+ time.sleep(1000);
+ return uncleanablePartitionsCountGauge.value() == 1;
+ },
+ 2000L,
+ "There should be 1 uncleanable partition");
+
+ Set<TopicPartition> uncleanablePartitions2 =
cleaner.cleanerManager().uncleanablePartitions(uncleanableDirectory);
+ assertFalse(uncleanablePartitions2.contains(TOPIC_PARTITIONS.get(0)));
+ assertTrue(uncleanablePartitions2.contains(TOPIC_PARTITIONS.get(1)));
+ assertFalse(uncleanablePartitions2.contains(TOPIC_PARTITIONS.get(2)));
+ }
+
+ @Test
+ public void testMaxLogCompactionLag() throws Exception {
+ cleaner = makeCleaner(TOPIC_PARTITIONS, CLEANER_BACKOFF_MS,
MIN_COMPACTION_LAG, SEGMENT_SIZE,
+ MAX_COMPACTION_LAG, MIN_CLEANABLE_DIRTY_RATIO);
+ UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+ long t0 = time.milliseconds();
+ writeKeyDups(100, 3, theLog, Compression.NONE, t0, 0, 1);
+
+ long startSizeBlock0 = theLog.size();
+
+ LogSegment activeSegAtT0 = theLog.activeSegment();
+
+ cleaner.startup();
+
+ // advance to a time still less than MAX_COMPACTION_LAG from start
+ time.sleep(MAX_COMPACTION_LAG / 2);
+ Thread.sleep(5 * CLEANER_BACKOFF_MS); // give cleaning thread a chance
to _not_ clean
+ assertEquals(startSizeBlock0, theLog.size(), "There should be no
cleaning until the max compaction lag has passed");
+
+ // advance to time a bit more than one MAX_COMPACTION_LAG from start
+ time.sleep(MAX_COMPACTION_LAG / 2 + 1);
+ long t1 = time.milliseconds();
+
+ // write the second block of data: all zero keys
+ List<int[]> appends1 = writeKeyDups(100, 1, theLog, Compression.NONE,
t1, 0, 0);
+
+ // roll the active segment
+ theLog.roll();
+ LogSegment activeSegAtT1 = theLog.activeSegment();
+ long firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset();
+
+ // the first block should get cleaned
+ cleaner.awaitCleaned(new TopicPartition("log", 0),
firstBlockCleanableSegmentOffset, 60000L);
+
+ List<int[]> read1 = readKeyValuePairsFromLog(theLog);
+ Long lastCleaned =
cleaner.cleanerManager().allCleanerCheckpoints().get(new TopicPartition("log",
0));
+ assertTrue(lastCleaned >= firstBlockCleanableSegmentOffset,
+ "log cleaner should have processed at least to offset " +
firstBlockCleanableSegmentOffset + ", but lastCleaned=" + lastCleaned);
+
+ // minCleanableDirtyRatio will prevent second block of data from
compacting
+ assertNotEquals(appends1.size(), read1.size(), "log should still
contain non-zero keys");
+
+ time.sleep(MAX_COMPACTION_LAG + 1);
+ // the second block should get cleaned. only zero keys left
+ cleaner.awaitCleaned(new TopicPartition("log", 0),
activeSegAtT1.baseOffset(), 60000L);
+
+ List<int[]> read2 = readKeyValuePairsFromLog(theLog);
+
+ assertEquals(appends1.size(), read2.size(), "log should only contain
zero keys now");
+ for (int i = 0; i < appends1.size(); i++) {
+ assertEquals(appends1.get(i)[0], read2.get(i)[0], "key mismatch at
index " + i);
+ assertEquals(appends1.get(i)[1], read2.get(i)[1], "value mismatch
at index " + i);
+ }
+
+ Long lastCleaned2 =
cleaner.cleanerManager().allCleanerCheckpoints().get(new TopicPartition("log",
0));
+ long secondBlockCleanableSegmentOffset = activeSegAtT1.baseOffset();
+ assertTrue(lastCleaned2 >= secondBlockCleanableSegmentOffset,
+ "log cleaner should have processed at least to offset " +
secondBlockCleanableSegmentOffset + ", but lastCleaned=" + lastCleaned2);
+ }
+
+ @Test
+ public void testIsThreadFailed() throws Exception {
+ cleaner = makeCleaner(TOPIC_PARTITIONS, 100000, 100L);
+ cleaner.startup();
+ assertEquals(0, cleaner.deadThreadCount());
+ // we simulate the unexpected error with an interrupt
+ cleaner.cleaners().forEach(Thread::interrupt);
+ // wait until interruption is propagated to all the threads
+ TestUtils.waitForCondition(
+ () ->
cleaner.cleaners().stream().allMatch(ShutdownableThread::isThreadFailed),
+ "Threads didn't terminate unexpectedly");
+ assertEquals(cleaner.cleaners().size(),
getGauge("DeadThreadCount").value());
+ assertEquals(cleaner.cleaners().size(), cleaner.deadThreadCount());
+ }
+
+ private void breakPartitionLog(TopicPartition tp) throws IOException {
+ UnifiedLog theLog = cleaner.logs().get(tp);
+ writeDups(20, 3, theLog, codec);
+
+ List<LogSegment> segments = theLog.logSegments();
+ LogSegment lastSegment = segments.get(segments.size() - 1);
+ File partitionFile = lastSegment.log().file();
+ try (PrintWriter writer = new PrintWriter(partitionFile)) {
+ writer.write("jogeajgoea");
+ }
+
+ writeDups(20, 3, theLog, codec);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> Gauge<T> getGauge(String metricName) {
+ for (Map.Entry<MetricName, Metric> entry :
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet()) {
+ MetricName name = entry.getKey();
+ if (name.getName().endsWith(metricName) && name.getScope() ==
null) {
+ return (Gauge<T>) entry.getValue();
+ }
+ }
+ throw new AssertionError("Unable to find metric: " + metricName);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> Gauge<T> getGauge(String metricName, String metricScope) {
+ for (Map.Entry<MetricName, Metric> entry :
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet()) {
+ MetricName name = entry.getKey();
+ if (name.getName().endsWith(metricName) && name.getScope() != null
&& name.getScope().endsWith(metricScope)) {
+ return (Gauge<T>) entry.getValue();
+ }
+ }
+ throw new AssertionError("Unable to find metric: " + metricName + "
with scope ending in " + metricScope);
+ }
+
private long calculateSizeUpToOffset(UnifiedLog log, long offset) {
long size = 0;
for (LogSegment segment : log.logSegments(0L, offset)) {
@@ -162,6 +346,41 @@ public class LogCleanerLagIntegrationTest {
return result;
}
+ private List<int[]> readKeyValuePairsFromLog(UnifiedLog log) {
+ List<int[]> result = new ArrayList<>();
+ for (LogSegment segment : log.logSegments()) {
+ for (Record record : segment.log().records()) {
+ int key =
Integer.parseInt(LogTestUtils.readString(record.key()));
+ int value =
Integer.parseInt(LogTestUtils.readString(record.value()));
+ result.add(new int[]{key, value});
+ }
+ }
+ return result;
+ }
+
+ private List<int[]> writeKeyDups(int numKeys, int numDups, UnifiedLog log,
Compression codec,
+ long timestamp, int startValue, int step)
throws IOException {
+ List<int[]> result = new ArrayList<>();
+ int valCounter = startValue;
+ for (int i = 0; i < numDups; i++) {
+ for (int key = 0; key < numKeys; key++) {
+ int curValue = valCounter;
+ log.appendAsLeader(
+ LogTestUtils.singletonRecords(
+ String.valueOf(curValue).getBytes(),
+ codec,
+ String.valueOf(key).getBytes(),
+ timestamp),
+ 0);
+ // move LSO forward to increase compaction bound
+ log.updateHighWatermark(log.logEndOffset());
+ valCounter += step;
+ result.add(new int[]{key, curValue});
+ }
+ }
+ return result;
+ }
+
private Map<Integer, Integer> writeDupsWithTimestamp(int numKeys, int
numDups, UnifiedLog log,
Compression codec,
long timestamp) throws IOException {
Map<Integer, Integer> result = new HashMap<>();
@@ -289,6 +508,41 @@ public class LogCleanerLagIntegrationTest {
new Properties());
}
+ private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+ int maxMessageSize,
+ long backoffMs) throws IOException {
+ return makeCleaner(partitions,
+ DEFAULT_MIN_CLEANABLE_DIRTY_RATIO,
+ 1,
+ backoffMs,
+ maxMessageSize,
+ DEFAULT_MIN_COMPACTION_LAG_MS,
+ DEFAULT_DELETE_DELAY,
+ DEFAULT_SEGMENT_SIZE,
+ DEFAULT_MAX_COMPACTION_LAG_MS,
+ null,
+ new Properties());
+ }
+
+ private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+ long backoffMs,
+ long minCompactionLagMs,
+ int segmentSize,
+ long maxCompactionLagMs,
+ float minCleanableDirtyRatio) throws
IOException {
+ return makeCleaner(partitions,
+ minCleanableDirtyRatio,
+ 1,
+ backoffMs,
+ DEFAULT_MAX_MESSAGE_SIZE,
+ minCompactionLagMs,
+ DEFAULT_DELETE_DELAY,
+ segmentSize,
+ maxCompactionLagMs,
+ null,
+ new Properties());
+ }
+
private int counter() {
return counter;
}
@@ -347,6 +601,7 @@ public class LogCleanerLagIntegrationTest {
@AfterEach
public void teardown() throws IOException, InterruptedException {
+ kafka.utils.TestUtils.clearYammerMetrics();
if (cleaner != null) {
cleaner.shutdown();
}