showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r920082431
########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -366,25 +392,30 @@ class LogManager(logDirs: Seq[File], val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length + updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length) + val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { try { debug(s"Loading log $logDir") val logLoadStartMs = time.hiResClockMs() val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, - defaultConfig, topicConfigOverrides) + defaultConfig, topicConfigOverrides, numRemainingSegments) val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs val currentNumLoaded = numLogsLoaded.incrementAndGet() info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " + s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") + println("!!! numRemainingSegments:" + numRemainingSegments + ";" + log) } catch { case e: IOException => handleIOException(logDirAbsolutePath, e) case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] => // KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache // And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here. + } finally { + updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length - numLogsLoaded.get()) Review Comment: Good catch! Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org