divijvaidya commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247863305
########## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ########## @@ -83,11 +84,19 @@ class LogCleanerTest { val numMetricsRegistered = LogCleaner.MetricNames.size verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - // verify that each metric is removed + // verify that each metric in `LogCleaner` is removed LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + // verify that each metric in `LogCleanerManager` is removed + val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1) + LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any(), any())) Review Comment: does this work? In our case we will call this multiple times with same metric Name. The any() will match all such occurrences whereas we are saying that this invocation occurs only 1 time (default is times(1), when not specified) Please help me understand why it's working. ########## core/src/main/scala/kafka/log/LogCleanerManager.scala: ########## @@ -88,17 +88,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], /* for coordinating the pausing and the cleaning of a partition */ private val pausedCleaningCond = lock.newCondition() + // Avoid adding legacy tags for a metric when initializing `LogCleanerManager` + GaugeMetricNameWithTag.clear() /* gauges for tracking the number of partitions marked as uncleanable for each log directory */ for (dir <- logDirs) { - metricsGroup.newGauge("uncleanable-partitions-count", + metricsGroup.newGauge(UncleanablePartitionsCountMetricName, () => inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) }, Map("logDirectory" -> dir.getAbsolutePath).asJava ) + GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k => new java.util.ArrayList[java.util.Map[String, String]]()) + .add(Map("logDirectory" -> dir.getAbsolutePath).asJava) Review Comment: could we move the `Map("logDirectory" -> dir.getAbsolutePath` part to a val metricTag and then re-use it at both locations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org