showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921127854
########## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ########## @@ -638,6 +641,221 @@ class LogManagerTest { assertTrue(logManager.partitionsInitializing.isEmpty) } + private def appendRecordsToLog(time: MockTime, parentLogDir: File, partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: Int): Unit = { + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) + val tpFile = new File(parentLogDir, s"$name-$partitionId") + + val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0, + 5 * 60 * 1000, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs) + + val numMessages = 20 + try { + for (_ <- 0 until numMessages) { + log.appendAsLeader(createRecords, leaderEpoch = 0) + } + + assertEquals(expectedSegmentsPerLog, log.numberOfSegments) + } finally { + log.close() + } + } + + private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, expectedParams: Map[String, Int]): Unit = { + val spyLogManagerClassName = spyLogManager.getClass().getSimpleName + // get all `remainingLogsToRecover` metrics + val logMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + + assertEquals(expectedParams.size, logMetrics.size) + + val capturedPath: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) + val capturedNumRemainingLogs: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + + // Since we'll update numRemainingLogs from totalLogs to 0 for each log dir, so we need to add 1 here + val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum + verify(spyLogManager, times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), capturedNumRemainingLogs.capture()); + + val paths = capturedPath.getAllValues + val numRemainingLogs = capturedNumRemainingLogs.getAllValues + + // expected the end value is 0 + logMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + + expectedParams.foreach { + case (path, totalLogs) => + // make sure we update the numRemainingLogs from totalLogs to 0 in order for each log dir + var expectedCurRemainingLogs = totalLogs + 1 + for (i <- 0 until paths.size()) { + if (paths.get(i).contains(path)) { + expectedCurRemainingLogs -= 1 + assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i)) + } + } + assertEquals(0, expectedCurRemainingLogs) + } + } + + private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager, + logDirs: Seq[File], + recoveryThreadsPerDataDir: Int, + mockMap: ConcurrentHashMap[String, Int], + expectedParams: Map[String, Int]): Unit = { + val spyLogManagerClassName = spyLogManager.getClass().getSimpleName + // get all `remainingSegmentsToRecover` metrics + val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + + // expected each log dir has 2 metrics for each thread Review Comment: Good catch! Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org