showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921127366
########## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ########## @@ -638,6 +641,221 @@ class LogManagerTest { assertTrue(logManager.partitionsInitializing.isEmpty) } + private def appendRecordsToLog(time: MockTime, parentLogDir: File, partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: Int): Unit = { + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) + val tpFile = new File(parentLogDir, s"$name-$partitionId") + + val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0, + 5 * 60 * 1000, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs) + + val numMessages = 20 + try { + for (_ <- 0 until numMessages) { + log.appendAsLeader(createRecords, leaderEpoch = 0) + } + + assertEquals(expectedSegmentsPerLog, log.numberOfSegments) + } finally { + log.close() + } + } + + private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, expectedParams: Map[String, Int]): Unit = { + val spyLogManagerClassName = spyLogManager.getClass().getSimpleName + // get all `remainingLogsToRecover` metrics + val logMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + + assertEquals(expectedParams.size, logMetrics.size) + + val capturedPath: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) + val capturedNumRemainingLogs: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + + // Since we'll update numRemainingLogs from totalLogs to 0 for each log dir, so we need to add 1 here + val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum + verify(spyLogManager, times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), capturedNumRemainingLogs.capture()); + + val paths = capturedPath.getAllValues + val numRemainingLogs = capturedNumRemainingLogs.getAllValues + + // expected the end value is 0 + logMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + + expectedParams.foreach { + case (path, totalLogs) => + // make sure we update the numRemainingLogs from totalLogs to 0 in order for each log dir + var expectedCurRemainingLogs = totalLogs + 1 + for (i <- 0 until paths.size()) { + if (paths.get(i).contains(path)) { + expectedCurRemainingLogs -= 1 + assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i)) + } + } + assertEquals(0, expectedCurRemainingLogs) + } + } + + private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager, + logDirs: Seq[File], + recoveryThreadsPerDataDir: Int, + mockMap: ConcurrentHashMap[String, Int], + expectedParams: Map[String, Int]): Unit = { + val spyLogManagerClassName = spyLogManager.getClass().getSimpleName + // get all `remainingSegmentsToRecover` metrics + val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + + // expected each log dir has 2 metrics for each thread + assertEquals(recoveryThreadsPerDataDir * logDirs.size, logSegmentMetrics.size) + + val capturedThreadName: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) + val capturedNumRemainingSegments: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + + // Since we'll update numRemainingSegments from totalSegments to 0 for each thread, so we need to add 1 here + val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum + verify(mockMap, times(expectedCallTimes)).put(capturedThreadName.capture(), capturedNumRemainingSegments.capture()); + + // expected the end value is 0 + logSegmentMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + + val threadNames = capturedThreadName.getAllValues + val numRemainingSegments = capturedNumRemainingSegments.getAllValues + + expectedParams.foreach { + case (threadName, totalSegments) => + // make sure we update the numRemainingSegments from totalSegments to 0 in order for each thread + var expectedCurRemainingSegments = totalSegments + 1 + for (i <- 0 until threadNames.size) { + if (threadNames.get(i).contains(threadName)) { + expectedCurRemainingSegments -= 1 + assertEquals(expectedCurRemainingSegments, numRemainingSegments.get(i)) + } + } + assertEquals(0, expectedCurRemainingSegments) + } + } + + private def verifyLogRecoverMetricsRemoved(spyLogManager: LogManager): Unit = { + val spyLogManagerClassName = spyLogManager.getClass().getSimpleName + // get all `remainingLogsToRecover` metrics + def logMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala + .filter { metric => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" } + + assertTrue(logMetrics.isEmpty) + + // get all `remainingSegmentsToRecover` metrics + val logSegmentMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala + .filter { metric => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" } + + assertTrue(logSegmentMetrics.isEmpty) + } + + @Test + def testLogRecoveryMetrics(): Unit = { + logManager.shutdown() Review Comment: Yes, we need this. In `setUp`, we'll create a default logManager, but that doesn't fit our test. So I have to shutdown it first, and then create another one below. And the shutdown in `tearDown` will shutdown my new created logManger. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org