showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921127366


##########
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##########
@@ -638,6 +641,221 @@ class LogManagerTest {
     assertTrue(logManager.partitionsInitializing.isEmpty)
   }
 
+  private def appendRecordsToLog(time: MockTime, parentLogDir: File, 
partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: 
Int): Unit = {
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = time.milliseconds)
+    val tpFile = new File(parentLogDir, s"$name-$partitionId")
+
+    val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, 
time.scheduler, time, 0, 0,
+      5 * 60 * 1000, 60 * 60 * 1000, 
LogManager.ProducerIdExpirationCheckIntervalMs)
+
+    val numMessages = 20
+    try {
+      for (_ <- 0 until numMessages) {
+        log.appendAsLeader(createRecords, leaderEpoch = 0)
+      }
+
+      assertEquals(expectedSegmentsPerLog, log.numberOfSegments)
+    } finally {
+      log.close()
+    }
+  }
+
+  private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, 
expectedParams: Map[String, Int]): Unit = {
+    val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+    // get all `remainingLogsToRecover` metrics
+    val logMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" }
+      .map { case (_, gauge) => gauge }
+      .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+    assertEquals(expectedParams.size, logMetrics.size)
+
+    val capturedPath: ArgumentCaptor[String] = 
ArgumentCaptor.forClass(classOf[String])
+    val capturedNumRemainingLogs: ArgumentCaptor[Int] = 
ArgumentCaptor.forClass(classOf[Int])
+
+    // Since we'll update numRemainingLogs from totalLogs to 0 for each log 
dir, so we need to add 1 here
+    val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum
+    verify(spyLogManager, 
times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), 
capturedNumRemainingLogs.capture());
+
+    val paths = capturedPath.getAllValues
+    val numRemainingLogs = capturedNumRemainingLogs.getAllValues
+
+    // expected the end value is 0
+    logMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
+
+    expectedParams.foreach {
+      case (path, totalLogs) =>
+        // make sure we update the numRemainingLogs from totalLogs to 0 in 
order for each log dir
+        var expectedCurRemainingLogs = totalLogs + 1
+        for (i <- 0 until paths.size()) {
+          if (paths.get(i).contains(path)) {
+            expectedCurRemainingLogs -= 1
+            assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i))
+          }
+        }
+        assertEquals(0, expectedCurRemainingLogs)
+    }
+  }
+
+  private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager,
+                                                     logDirs: Seq[File],
+                                                     
recoveryThreadsPerDataDir: Int,
+                                                     mockMap: 
ConcurrentHashMap[String, Int],
+                                                     expectedParams: 
Map[String, Int]): Unit = {
+    val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+    // get all `remainingSegmentsToRecover` metrics
+    val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+          .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
+          .map { case (_, gauge) => gauge }
+          .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+    // expected each log dir has 2 metrics for each thread
+    assertEquals(recoveryThreadsPerDataDir * logDirs.size, 
logSegmentMetrics.size)
+
+    val capturedThreadName: ArgumentCaptor[String] = 
ArgumentCaptor.forClass(classOf[String])
+    val capturedNumRemainingSegments: ArgumentCaptor[Int] = 
ArgumentCaptor.forClass(classOf[Int])
+
+    // Since we'll update numRemainingSegments from totalSegments to 0 for 
each thread, so we need to add 1 here
+    val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum
+    verify(mockMap, 
times(expectedCallTimes)).put(capturedThreadName.capture(), 
capturedNumRemainingSegments.capture());
+
+    // expected the end value is 0
+    logSegmentMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
+
+    val threadNames = capturedThreadName.getAllValues
+    val numRemainingSegments = capturedNumRemainingSegments.getAllValues
+
+    expectedParams.foreach {
+      case (threadName, totalSegments) =>
+        // make sure we update the numRemainingSegments from totalSegments to 
0 in order for each thread
+        var expectedCurRemainingSegments = totalSegments + 1
+        for (i <- 0 until threadNames.size) {
+          if (threadNames.get(i).contains(threadName)) {
+            expectedCurRemainingSegments -= 1
+            assertEquals(expectedCurRemainingSegments, 
numRemainingSegments.get(i))
+          }
+        }
+        assertEquals(0, expectedCurRemainingSegments)
+    }
+  }
+
+  private def verifyLogRecoverMetricsRemoved(spyLogManager: LogManager): Unit 
= {
+    val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+    // get all `remainingLogsToRecover` metrics
+    def logMetrics: mutable.Set[MetricName] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
+      .filter { metric => metric.getType == s"$spyLogManagerClassName" && 
metric.getName == "remainingLogsToRecover" }
+
+    assertTrue(logMetrics.isEmpty)
+
+    // get all `remainingSegmentsToRecover` metrics
+    val logSegmentMetrics: mutable.Set[MetricName] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
+      .filter { metric => metric.getType == s"$spyLogManagerClassName" && 
metric.getName == "remainingSegmentsToRecover" }
+
+    assertTrue(logSegmentMetrics.isEmpty)
+  }
+
+  @Test
+  def testLogRecoveryMetrics(): Unit = {
+    logManager.shutdown()

Review Comment:
   Yes, we need this. In `setUp`, we'll create a default logManager, but that 
doesn't fit our test. So I have to shutdown it first, and then create another 
one below. And the shutdown in `tearDown` will shutdown my new created 
logManger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to