junrao commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r919529062
########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -366,25 +392,30 @@ class LogManager(logDirs: Seq[File], val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length + updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length) + val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { try { debug(s"Loading log $logDir") val logLoadStartMs = time.hiResClockMs() val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, - defaultConfig, topicConfigOverrides) + defaultConfig, topicConfigOverrides, numRemainingSegments) val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs val currentNumLoaded = numLogsLoaded.incrementAndGet() info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " + s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") + println("!!! numRemainingSegments:" + numRemainingSegments + ";" + log) } catch { case e: IOException => handleIOException(logDirAbsolutePath, e) case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] => // KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache // And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here. + } finally { + updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length - numLogsLoaded.get()) Review Comment: Hmm, how do we handle concurrency here? It's possible that one thread completes and calls numLogsLoaded.get(), but before it could call updateNumRemainingLogs(), another thread completes and calls updateNumRemainingLogs(). Then this thread continue to call updateNumRemainingLogs(), which updates numRemainingLogs with an outdated count. ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -90,6 +90,10 @@ class LogManager(logDirs: Seq[File], private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) @volatile private var _currentDefaultConfig = initialDefaultConfig @volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir + // log dir path -> number of Remaining logs map for remainingLogsToRecover metric + private val numRemainingLogs: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int] + // log recovery thread name -> number of remaining segments map for remainingSegmentsToRecover metric + private val numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int] Review Comment: Could we make these two maps local in loadLogs()? ########## core/src/main/scala/kafka/log/LogLoader.scala: ########## @@ -424,9 +432,11 @@ class LogLoader( // we had an invalid message, delete all remaining log warn(s"Corruption found in segment ${segment.baseOffset}," + s" truncating to offset ${segment.readNextOffset}") - removeAndDeleteSegmentsAsync(unflushed.toList) + removeAndDeleteSegmentsAsync(unflushedIter.toList) Review Comment: This will iterate unflushedIter. If we don't account for that, remaining segments won't be accurate. ########## core/src/main/scala/kafka/log/LogLoader.scala: ########## @@ -404,12 +406,18 @@ class LogLoader( // If we have the clean shutdown marker, skip recovery. if (!hadCleanShutdown) { - val unflushed = segments.values(recoveryPointCheckpoint, Long.MaxValue).iterator + val unflushed = segments.values(recoveryPointCheckpoint, Long.MaxValue) + val unflushedSize = unflushed.size Review Comment: To be consistent with numFlushed, this probably should be named numUnflushed? ########## core/src/main/scala/kafka/log/LogLoader.scala: ########## @@ -77,7 +78,8 @@ class LogLoader( logStartOffsetCheckpoint: Long, recoveryPointCheckpoint: Long, leaderEpochCache: Option[LeaderEpochFileCache], - producerStateManager: ProducerStateManager + producerStateManager: ProducerStateManager, + numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int] Review Comment: Could we add the new param to javadoc? ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -366,25 +392,30 @@ class LogManager(logDirs: Seq[File], val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length + updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length) + val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { try { debug(s"Loading log $logDir") val logLoadStartMs = time.hiResClockMs() val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, - defaultConfig, topicConfigOverrides) + defaultConfig, topicConfigOverrides, numRemainingSegments) val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs val currentNumLoaded = numLogsLoaded.incrementAndGet() info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " + s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") + println("!!! numRemainingSegments:" + numRemainingSegments + ";" + log) Review Comment: Is this needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org