divijvaidya commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247863305


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -83,11 +84,19 @@ class LogCleanerTest {
       val numMetricsRegistered = LogCleaner.MetricNames.size
       verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())
       
-      // verify that each metric is removed
+      // verify that each metric in `LogCleaner` is removed
       LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
 
+      // verify that each metric in `LogCleanerManager` is removed
+      val mockLogCleanerManagerMetricsGroup = 
mockMetricsGroupCtor.constructed.get(1)
+      LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any()))
+      
LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any(), any()))

Review Comment:
   does this work? In our case we will call this multiple times with same 
metric Name. The any() will match all such occurrences whereas we are saying 
that this invocation occurs only 1 time (default is times(1), when not 
specified)
   
   Please help me understand why it's working.



##########
core/src/main/scala/kafka/log/LogCleanerManager.scala:
##########
@@ -88,17 +88,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   /* for coordinating the pausing and the cleaning of a partition */
   private val pausedCleaningCond = lock.newCondition()
 
+  // Avoid adding legacy tags for a metric when initializing 
`LogCleanerManager`
+  GaugeMetricNameWithTag.clear()
   /* gauges for tracking the number of partitions marked as uncleanable for 
each log directory */
   for (dir <- logDirs) {
-    metricsGroup.newGauge("uncleanable-partitions-count",
+    metricsGroup.newGauge(UncleanablePartitionsCountMetricName,
       () => inLock(lock) { 
uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) },
       Map("logDirectory" -> dir.getAbsolutePath).asJava
     )
+    
GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k 
=> new java.util.ArrayList[java.util.Map[String, String]]())
+      .add(Map("logDirectory" -> dir.getAbsolutePath).asJava)

Review Comment:
   could we move the `Map("logDirectory" -> dir.getAbsolutePath` part to a val 
metricTag and then re-use it at both locations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to