showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r920087197


##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -366,25 +392,31 @@ class LogManager(logDirs: Seq[File],
         val numLogsLoaded = new AtomicInteger(0)
         numTotalLogs += logsToLoad.length
 
+        updateNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath, 
logsToLoad.length)
+
         val jobsForDir = logsToLoad.map { logDir =>
           val runnable: Runnable = () => {
+            debug(s"Loading log $logDir")
+            var log = None: Option[UnifiedLog]
+            val logLoadStartMs = time.hiResClockMs()
             try {
-              debug(s"Loading log $logDir")
-
-              val logLoadStartMs = time.hiResClockMs()
-              val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
-                defaultConfig, topicConfigOverrides)
-              val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
-              val currentNumLoaded = numLogsLoaded.incrementAndGet()
-
-              info(s"Completed load of $log with ${log.numberOfSegments} 
segments in ${logLoadDurationMs}ms " +
-                s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
+              log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
+                defaultConfig, topicConfigOverrides, numRemainingSegments))
             } catch {
               case e: IOException =>
                 handleIOException(logDirAbsolutePath, e)
               case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
                 // KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache
                 // And while converting IOException to KafkaStorageException, 
we've already handled the exception. So we can ignore it here.
+            } finally {
+              val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
+              val currentNumLoaded = numLogsLoaded.incrementAndGet()
+              updateNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath, 
logsToLoad.length - currentNumLoaded)
+              log match {
+                case Some(loadedLog) => info(s"Completed load of $loadedLog 
with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " +
+                  s"($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")
+                case None => info(s"Error while loading logs in $logDir in 
${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")
+              }

Review Comment:
   Move `currentNumLoaded` and log logic into `finally` block, so that we can 
make sure we also count for "failed" cases in `currentNumLoaded` result and log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to