This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 15ec0536654 KAFKA-18656 Backport KAFKA-18597 to 4.0 (#20026)
15ec0536654 is described below
commit 15ec0536654f36bea73945f594f640b2592d689e
Author: Logan Zhu <[email protected]>
AuthorDate: Wed Jun 25 19:19:30 2025 +0800
KAFKA-18656 Backport KAFKA-18597 to 4.0 (#20026)
Backport of [KAFKA-18597](https://github.com/apache/kafka/pull/18627) to
the 4.0 branch.
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 19 +--
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 168 ++++++++++++++++++---
2 files changed, 153 insertions(+), 34 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 5d7ee518963..e1f4d4afa2d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -117,14 +117,14 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* @param f to compute the result
- * @return the max value (int value) or 0 if there is no cleaner
+ * @return the max value or 0 if there is no cleaner
*/
- private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Int =
- cleaners.map(f).maxOption.getOrElse(0.0d).toInt
+ private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Double =
+ cleaners.map(f).maxOption.getOrElse(0.0d)
/* a metric to track the maximum utilization of any thread's buffer in the
last cleaning */
metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
- () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
+ () => (maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
/* a metric to track the recopy rate of each thread's last cleaning */
metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
@@ -134,12 +134,12 @@ class LogCleaner(initialConfig: CleanerConfig,
})
/* a metric to track the maximum cleaning time for the last cleaning from
each thread */
- metricsGroup.newGauge(MaxCleanTimeMetricName, () =>
maxOverCleanerThreads(_.lastStats.elapsedSecs))
+ metricsGroup.newGauge(MaxCleanTimeMetricName, () =>
maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)
// a metric to track delay between the time when a log is required to be
compacted
// as determined by max compaction lag and the time of last cleaner run.
metricsGroup.newGauge(MaxCompactionDelayMetricsName,
- () =>
maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
+ () =>
(maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) /
1000).toInt)
metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)
@@ -523,10 +523,11 @@ object LogCleaner {
}
- private val MaxBufferUtilizationPercentMetricName =
"max-buffer-utilization-percent"
+ // Visible for test.
+ private[log] val MaxBufferUtilizationPercentMetricName =
"max-buffer-utilization-percent"
private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
- private val MaxCleanTimeMetricName = "max-clean-time-secs"
- private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
+ private[log] val MaxCleanTimeMetricName = "max-clean-time-secs"
+ private[log] val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
private val DeadThreadCountMetricName = "DeadThreadCount"
// package private for testing
private[log] val MetricNames = Set(
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 106a4a78a93..9ca39163194 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -18,6 +18,7 @@
package kafka.log
import kafka.common._
+import kafka.log.LogCleaner.{MaxBufferUtilizationPercentMetricName,
MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
import org.apache.kafka.common.TopicPartition
@@ -2048,42 +2049,159 @@ class LogCleanerTest extends Logging {
}
@Test
- def testMaxOverCleanerThreads(): Unit = {
- val logCleaner = new LogCleaner(new CleanerConfig(true),
+ def testMaxBufferUtilizationPercentMetric(): Unit = {
+ val logCleaner = new LogCleaner(
+ new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
- time = time)
+ time = time
+ )
+
+ def assertMaxBufferUtilizationPercent(expected: Int): Unit = {
+ val gauge =
logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
+ () => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization)
* 100).toInt)
+ assertEquals(expected, gauge.value())
+ }
+
+ try {
+ // No CleanerThreads
+ assertMaxBufferUtilizationPercent(0)
+
+ val cleaners = logCleaner.cleaners
+
+ val cleaner1 = new logCleaner.CleanerThread(1)
+ cleaner1.lastStats = new CleanerStats(time)
+ cleaner1.lastStats.bufferUtilization = 0.75
+ cleaners += cleaner1
+
+ val cleaner2 = new logCleaner.CleanerThread(2)
+ cleaner2.lastStats = new CleanerStats(time)
+ cleaner2.lastStats.bufferUtilization = 0.85
+ cleaners += cleaner2
+
+ val cleaner3 = new logCleaner.CleanerThread(3)
+ cleaner3.lastStats = new CleanerStats(time)
+ cleaner3.lastStats.bufferUtilization = 0.65
+ cleaners += cleaner3
+
+ // expect the gauge value to reflect the maximum bufferUtilization
+ assertMaxBufferUtilizationPercent(85)
+
+ // Update bufferUtilization and verify the gauge value updates
+ cleaner1.lastStats.bufferUtilization = 0.9
+ assertMaxBufferUtilizationPercent(90)
+
+ // All CleanerThreads have the same bufferUtilization
+ cleaners.foreach(_.lastStats.bufferUtilization = 0.5)
+ assertMaxBufferUtilizationPercent(50)
+ } finally {
+ logCleaner.shutdown()
+ }
+ }
+
+ @Test
+ def testMaxCleanTimeMetric(): Unit = {
+ val logCleaner = new LogCleaner(
+ new CleanerConfig(true),
+ logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
+ logs = new Pool[TopicPartition, UnifiedLog](),
+ logDirFailureChannel = new LogDirFailureChannel(1),
+ time = time
+ )
+
+ def assertMaxCleanTime(expected: Int): Unit = {
+ val gauge = logCleaner.metricsGroup.newGauge(MaxCleanTimeMetricName,
+ () => logCleaner.maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)
+ assertEquals(expected, gauge.value())
+ }
- val cleaners = logCleaner.cleaners
+ try {
+ // No CleanerThreads
+ assertMaxCleanTime(0)
- val cleaner1 = new logCleaner.CleanerThread(1)
- cleaner1.lastStats = new CleanerStats(time)
- cleaner1.lastStats.bufferUtilization = 0.75
- cleaners += cleaner1
+ val cleaners = logCleaner.cleaners
- val cleaner2 = new logCleaner.CleanerThread(2)
- cleaner2.lastStats = new CleanerStats(time)
- cleaner2.lastStats.bufferUtilization = 0.85
- cleaners += cleaner2
+ val cleaner1 = new logCleaner.CleanerThread(1)
+ cleaner1.lastStats = new CleanerStats(time)
+ cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 1_000L
+ cleaners += cleaner1
- val cleaner3 = new logCleaner.CleanerThread(3)
- cleaner3.lastStats = new CleanerStats(time)
- cleaner3.lastStats.bufferUtilization = 0.65
- cleaners += cleaner3
+ val cleaner2 = new logCleaner.CleanerThread(2)
+ cleaner2.lastStats = new CleanerStats(time)
+ cleaner2.lastStats.endTime = cleaner2.lastStats.startTime + 2_000L
+ cleaners += cleaner2
- assertEquals(0,
logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
+ val cleaner3 = new logCleaner.CleanerThread(3)
+ cleaner3.lastStats = new CleanerStats(time)
+ cleaner3.lastStats.endTime = cleaner3.lastStats.startTime + 3_000L
+ cleaners += cleaner3
- cleaners.clear()
+ // expect the gauge value to reflect the maximum cleanTime
+ assertMaxCleanTime(3)
- cleaner1.lastStats.bufferUtilization = 5d
- cleaners += cleaner1
- cleaner2.lastStats.bufferUtilization = 6d
- cleaners += cleaner2
- cleaner3.lastStats.bufferUtilization = 7d
- cleaners += cleaner3
+ // Update cleanTime and verify the gauge value updates
+ cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 4_000L
+ assertMaxCleanTime(4)
- assertEquals(7,
logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
+ // All CleanerThreads have the same cleanTime
+ cleaners.foreach(cleaner => cleaner.lastStats.endTime =
cleaner.lastStats.startTime + 1_500L)
+ assertMaxCleanTime(1)
+ } finally {
+ logCleaner.shutdown()
+ }
+ }
+
+ @Test
+ def testMaxCompactionDelayMetrics(): Unit = {
+ val logCleaner = new LogCleaner(
+ new CleanerConfig(true),
+ logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
+ logs = new Pool[TopicPartition, UnifiedLog](),
+ logDirFailureChannel = new LogDirFailureChannel(1),
+ time = time
+ )
+
+ def assertMaxCompactionDelay(expected: Int): Unit = {
+ val gauge =
logCleaner.metricsGroup.newGauge(MaxCompactionDelayMetricsName,
+ () =>
(logCleaner.maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble)
/ 1000).toInt)
+ assertEquals(expected, gauge.value())
+ }
+
+ try {
+ // No CleanerThreads
+ assertMaxCompactionDelay(0)
+
+ val cleaners = logCleaner.cleaners
+
+ val cleaner1 = new logCleaner.CleanerThread(1)
+ cleaner1.lastStats = new CleanerStats(time)
+ cleaner1.lastPreCleanStats.maxCompactionDelayMs = 1_000L
+ cleaners += cleaner1
+
+ val cleaner2 = new logCleaner.CleanerThread(2)
+ cleaner2.lastStats = new CleanerStats(time)
+ cleaner2.lastPreCleanStats.maxCompactionDelayMs = 2_000L
+ cleaners += cleaner2
+
+ val cleaner3 = new logCleaner.CleanerThread(3)
+ cleaner3.lastStats = new CleanerStats(time)
+ cleaner3.lastPreCleanStats.maxCompactionDelayMs = 3_000L
+ cleaners += cleaner3
+
+ // expect the gauge value to reflect the maximum CompactionDelay
+ assertMaxCompactionDelay(3)
+
+ // Update CompactionDelay and verify the gauge value updates
+ cleaner1.lastPreCleanStats.maxCompactionDelayMs = 4_000L
+ assertMaxCompactionDelay(4)
+
+ // All CleanerThreads have the same CompactionDelay
+ cleaners.foreach(_.lastPreCleanStats.maxCompactionDelayMs = 1_500L)
+ assertMaxCompactionDelay(1)
+ } finally {
+ logCleaner.shutdown()
+ }
}
private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)],
offsetSeq: Iterable[Long]): Iterable[Long] = {