junrao commented on code in PR #12136: URL: https://github.com/apache/kafka/pull/12136#discussion_r871593948
########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -376,8 +381,10 @@ class LogManager(logDirs: Seq[File], s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") } catch { case e: IOException => - offlineDirs.add((logDirAbsolutePath, e)) - error(s"Error while loading log dir $logDirAbsolutePath", e) + handleIOException(logDirAbsolutePath, e) + case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] => + // KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache + handleIOException(logDirAbsolutePath, e.getCause.asInstanceOf[IOException]) Review Comment: If we hit KafkaStorageException, that means the failed disk has already been reported to the logDirFailureChannel when KafkaStorageException was generated. So, we probably don't need to track it in offlineDirs again. ########## core/src/test/scala/unit/kafka/log/LogLoaderTest.scala: ########## @@ -158,22 +197,54 @@ class LogLoaderTest { } locally { - simulateError.hasError = true - val logManager: LogManager = interceptedLogManager(logConfig, logDirs, simulateError) - log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None) + val logDirFailureChannel = new LogDirFailureChannel(logDirs.size) + val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.RuntimeException, logDirFailureChannel) - // Simulate error - assertThrows(classOf[RuntimeException], () => { - val defaultConfig = logManager.currentDefaultConfig - logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) - }) + // Simulate Runtime error + assertThrows(classOf[RuntimeException], runLoadLogs) assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not have existed") + assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when Runtime Exception thrown") + // Do not simulate error on next call to LogManager#loadLogs. LogManager must understand that log had unclean shutdown the last time. simulateError.hasError = false cleanShutdownInterceptedValue = true val defaultConfig = logManager.currentDefaultConfig logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean shutdown flag") + logManager.shutdown() + } + + locally { + val logDirFailureChannel = new LogDirFailureChannel(logDirs.size) + val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.IOException, logDirFailureChannel) + + // Simulate IO error + assertDoesNotThrow(runLoadLogs, "IOException should be caught and handled") + + assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the log dir should turn offline after IOException thrown") + logManager.shutdown() + } + + locally { Review Comment: Could we avoid duplicating the code by iterating the same logic twice? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org