junrao commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r919529062


##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -366,25 +392,30 @@ class LogManager(logDirs: Seq[File],
         val numLogsLoaded = new AtomicInteger(0)
         numTotalLogs += logsToLoad.length
 
+        updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length)
+
         val jobsForDir = logsToLoad.map { logDir =>
           val runnable: Runnable = () => {
             try {
               debug(s"Loading log $logDir")
 
               val logLoadStartMs = time.hiResClockMs()
               val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
-                defaultConfig, topicConfigOverrides)
+                defaultConfig, topicConfigOverrides, numRemainingSegments)
               val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
               val currentNumLoaded = numLogsLoaded.incrementAndGet()
 
               info(s"Completed load of $log with ${log.numberOfSegments} 
segments in ${logLoadDurationMs}ms " +
                 s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
+              println("!!! numRemainingSegments:" + numRemainingSegments + ";" 
+ log)
             } catch {
               case e: IOException =>
                 handleIOException(logDirAbsolutePath, e)
               case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
                 // KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache
                 // And while converting IOException to KafkaStorageException, 
we've already handled the exception. So we can ignore it here.
+            } finally {
+              updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length - 
numLogsLoaded.get())

Review Comment:
   Hmm, how do we handle concurrency here? It's possible that one thread 
completes and calls numLogsLoaded.get(), but before it could call 
updateNumRemainingLogs(), another thread completes and calls 
updateNumRemainingLogs(). Then this thread continue to call 
updateNumRemainingLogs(), which updates numRemainingLogs with an outdated count.



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -90,6 +90,10 @@ class LogManager(logDirs: Seq[File],
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = 
createAndValidateLogDirs(logDirs, initialOfflineDirs)
   @volatile private var _currentDefaultConfig = initialDefaultConfig
   @volatile private var numRecoveryThreadsPerDataDir = 
recoveryThreadsPerDataDir
+  // log dir path -> number of Remaining logs map for remainingLogsToRecover 
metric
+  private val numRemainingLogs: ConcurrentMap[String, Int] = new 
ConcurrentHashMap[String, Int]
+  // log recovery thread name -> number of remaining segments map for 
remainingSegmentsToRecover metric
+  private val numRemainingSegments: ConcurrentMap[String, Int] = new 
ConcurrentHashMap[String, Int]

Review Comment:
   Could we make these two maps local in loadLogs()?



##########
core/src/main/scala/kafka/log/LogLoader.scala:
##########
@@ -424,9 +432,11 @@ class LogLoader(
           // we had an invalid message, delete all remaining log
           warn(s"Corruption found in segment ${segment.baseOffset}," +
             s" truncating to offset ${segment.readNextOffset}")
-          removeAndDeleteSegmentsAsync(unflushed.toList)
+          removeAndDeleteSegmentsAsync(unflushedIter.toList)

Review Comment:
   This will iterate unflushedIter. If we don't account for that, remaining 
segments won't be accurate.



##########
core/src/main/scala/kafka/log/LogLoader.scala:
##########
@@ -404,12 +406,18 @@ class LogLoader(
 
     // If we have the clean shutdown marker, skip recovery.
     if (!hadCleanShutdown) {
-      val unflushed = segments.values(recoveryPointCheckpoint, 
Long.MaxValue).iterator
+      val unflushed = segments.values(recoveryPointCheckpoint, Long.MaxValue)
+      val unflushedSize = unflushed.size

Review Comment:
   To be consistent with numFlushed, this probably should be named numUnflushed?



##########
core/src/main/scala/kafka/log/LogLoader.scala:
##########
@@ -77,7 +78,8 @@ class LogLoader(
   logStartOffsetCheckpoint: Long,
   recoveryPointCheckpoint: Long,
   leaderEpochCache: Option[LeaderEpochFileCache],
-  producerStateManager: ProducerStateManager
+  producerStateManager: ProducerStateManager,
+  numRemainingSegments: ConcurrentMap[String, Int] = new 
ConcurrentHashMap[String, Int]

Review Comment:
   Could we add the new param to javadoc?



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -366,25 +392,30 @@ class LogManager(logDirs: Seq[File],
         val numLogsLoaded = new AtomicInteger(0)
         numTotalLogs += logsToLoad.length
 
+        updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length)
+
         val jobsForDir = logsToLoad.map { logDir =>
           val runnable: Runnable = () => {
             try {
               debug(s"Loading log $logDir")
 
               val logLoadStartMs = time.hiResClockMs()
               val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
-                defaultConfig, topicConfigOverrides)
+                defaultConfig, topicConfigOverrides, numRemainingSegments)
               val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
               val currentNumLoaded = numLogsLoaded.incrementAndGet()
 
               info(s"Completed load of $log with ${log.numberOfSegments} 
segments in ${logLoadDurationMs}ms " +
                 s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
+              println("!!! numRemainingSegments:" + numRemainingSegments + ";" 
+ log)

Review Comment:
   Is this needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to