showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921127854


##########
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##########
@@ -638,6 +641,221 @@ class LogManagerTest {
     assertTrue(logManager.partitionsInitializing.isEmpty)
   }
 
+  private def appendRecordsToLog(time: MockTime, parentLogDir: File, 
partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: 
Int): Unit = {
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = time.milliseconds)
+    val tpFile = new File(parentLogDir, s"$name-$partitionId")
+
+    val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, 
time.scheduler, time, 0, 0,
+      5 * 60 * 1000, 60 * 60 * 1000, 
LogManager.ProducerIdExpirationCheckIntervalMs)
+
+    val numMessages = 20
+    try {
+      for (_ <- 0 until numMessages) {
+        log.appendAsLeader(createRecords, leaderEpoch = 0)
+      }
+
+      assertEquals(expectedSegmentsPerLog, log.numberOfSegments)
+    } finally {
+      log.close()
+    }
+  }
+
+  private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, 
expectedParams: Map[String, Int]): Unit = {
+    val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+    // get all `remainingLogsToRecover` metrics
+    val logMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" }
+      .map { case (_, gauge) => gauge }
+      .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+    assertEquals(expectedParams.size, logMetrics.size)
+
+    val capturedPath: ArgumentCaptor[String] = 
ArgumentCaptor.forClass(classOf[String])
+    val capturedNumRemainingLogs: ArgumentCaptor[Int] = 
ArgumentCaptor.forClass(classOf[Int])
+
+    // Since we'll update numRemainingLogs from totalLogs to 0 for each log 
dir, so we need to add 1 here
+    val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum
+    verify(spyLogManager, 
times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), 
capturedNumRemainingLogs.capture());
+
+    val paths = capturedPath.getAllValues
+    val numRemainingLogs = capturedNumRemainingLogs.getAllValues
+
+    // expected the end value is 0
+    logMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
+
+    expectedParams.foreach {
+      case (path, totalLogs) =>
+        // make sure we update the numRemainingLogs from totalLogs to 0 in 
order for each log dir
+        var expectedCurRemainingLogs = totalLogs + 1
+        for (i <- 0 until paths.size()) {
+          if (paths.get(i).contains(path)) {
+            expectedCurRemainingLogs -= 1
+            assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i))
+          }
+        }
+        assertEquals(0, expectedCurRemainingLogs)
+    }
+  }
+
+  private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager,
+                                                     logDirs: Seq[File],
+                                                     
recoveryThreadsPerDataDir: Int,
+                                                     mockMap: 
ConcurrentHashMap[String, Int],
+                                                     expectedParams: 
Map[String, Int]): Unit = {
+    val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+    // get all `remainingSegmentsToRecover` metrics
+    val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+          .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
+          .map { case (_, gauge) => gauge }
+          .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+    // expected each log dir has 2 metrics for each thread

Review Comment:
   Good catch! Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to