[GitHub] [kafka] dajac commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown
dajac commented on code in PR #13623: URL: https://github.com/apache/kafka/pull/13623#discussion_r1184723565 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -62,6 +65,39 @@ class LogCleanerTest { Utils.delete(tmpdir) } + @Test + def testRemoveMetricsOnClose(): Unit = { +val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) +try { + val logCleaner = new LogCleaner(new CleanerConfig(true), +logDirs = Array(TestUtils.tempDir()), +logs = new Pool[TopicPartition, UnifiedLog](), +logDirFailureChannel = new LogDirFailureChannel(1), +time = time) + + // shutdown logCleaner so that metrics are removed + logCleaner.shutdown() + + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + val numMetricsRegistered = 5 + verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) Review Comment: That makes sense. Thanks for the clarification. I wonder if we could use `LogCleaner.MetricNames.size` instead of hardcoding `5`. I suppose that it would have a similar semantic, does it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown
dajac commented on code in PR #13623: URL: https://github.com/apache/kafka/pull/13623#discussion_r1183264038 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -466,6 +472,17 @@ object LogCleaner { config.logCleanerEnable) } + + private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent" + private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent" + private val MaxCleanTimeMetricName = "max-clean-time-secs" + private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs" + private val DeadThreadCountMetricName = "DeadThreadCount" + private[log] val MetricNames = Set(MaxBufferUtilizationPercentMetricName, Review Comment: nit 1: I suppose that we need the package private to access this from tests. We usually add a comment such as `// package private for testing` in this case. nit 2: Could we format the `Set` like `ReconfigurableConfigs` at L448? ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -62,6 +65,39 @@ class LogCleanerTest { Utils.delete(tmpdir) } + @Test + def testRemoveMetricsOnClose(): Unit = { +val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) +try { + val logCleaner = new LogCleaner(new CleanerConfig(true), +logDirs = Array(TestUtils.tempDir()), +logs = new Pool[TopicPartition, UnifiedLog](), +logDirFailureChannel = new LogDirFailureChannel(1), +time = time) + + // shutdown logCleaner so that metrics are removed + logCleaner.shutdown() + + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + val numMetricsRegistered = 5 + verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) Review Comment: nit: I wonder if we should also verify the expected names here like we did for `removeMetric`. What do you think? ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -62,6 +65,39 @@ class LogCleanerTest { Utils.delete(tmpdir) } + @Test + def testRemoveMetricsOnClose(): Unit = { +val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) +try { + val logCleaner = new LogCleaner(new CleanerConfig(true), +logDirs = Array(TestUtils.tempDir()), +logs = new Pool[TopicPartition, UnifiedLog](), +logDirFailureChannel = new LogDirFailureChannel(1), +time = time) + + // shutdown logCleaner so that metrics are removed + logCleaner.shutdown() + + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + val numMetricsRegistered = 5 + verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) + + // verify that all metrics are added to the list of metric name + assertEquals(LogCleaner.MetricNames.size, numMetricsRegistered, +"All metrics are not part of MetricNames collections") + + // verify that each metric is removed + LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + + // assert that we have verified all invocations on + verifyNoMoreInteractions(mockMetricsGroup) +} finally { + if (mockMetricsGroupCtor != null) { Review Comment: Is this check needed? If we get to the try..catch, `mockMetricsGroupCtor` should be non-null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown
dajac commented on code in PR #13623: URL: https://github.com/apache/kafka/pull/13623#discussion_r1182595446 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -62,6 +65,33 @@ class LogCleanerTest { Utils.delete(tmpdir) } + @Test + def testRemoveMetricsOnClose(): Unit = { +val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) +try { + val logCleaner = new LogCleaner(new CleanerConfig(true), +logDirs = Array(TestUtils.tempDir()), +logs = new Pool[TopicPartition, UnifiedLog](), +logDirFailureChannel = new LogDirFailureChannel(1), +time = time) + + // shutdown logCleaner so that metrics are removed + logCleaner.shutdown() + + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + val numMetricsRegistered = 5 + verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) + verify(mockMetricsGroup, times(numMetricsRegistered)).removeMetric(anyString()) Review Comment: nit: Should we actually verify that `removeMetric` was called for the metric name that we expected? ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -124,29 +124,37 @@ class LogCleaner(initialConfig: CleanerConfig, private def maxOverCleanerThreads(f: CleanerThread => Double): Int = cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt + private val maxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent" + private val cleanerRecopyPercentMetricName = "cleaner-recopy-percent" + private val maxCleanTimeMetricName = "max-clean-time-secs" + private val maxCompactionDelayMetricsName = "max-compaction-delay-secs" + private val deadThreadCountMetricName = "DeadThreadCount" + private val metricNames = Set.apply(maxBufferUtilizationPercentMetricName, Review Comment: Could we move those to the companion object? Moreover, as they are constants, they should start with a capital letter. There is also an extra space after `=` for some of them. You can also use `Set(..)` instead of `Set.apply()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org