junrao commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r871593948


##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -376,8 +381,10 @@ class LogManager(logDirs: Seq[File],
                 s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
             } catch {
               case e: IOException =>
-                offlineDirs.add((logDirAbsolutePath, e))
-                error(s"Error while loading log dir $logDirAbsolutePath", e)
+                handleIOException(logDirAbsolutePath, e)
+              case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
+                // KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache
+                handleIOException(logDirAbsolutePath, 
e.getCause.asInstanceOf[IOException])

Review Comment:
   If we hit KafkaStorageException, that means the failed disk has already been 
reported to the logDirFailureChannel when KafkaStorageException was generated. 
So, we probably don't need to track it in offlineDirs again.



##########
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##########
@@ -158,22 +197,54 @@ class LogLoaderTest {
     }
 
     locally {
-      simulateError.hasError = true
-      val logManager: LogManager = interceptedLogManager(logConfig, logDirs, 
simulateError)
-      log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = 
None)
+      val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+      val (logManager, runLoadLogs) = 
initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.RuntimeException, 
logDirFailureChannel)
 
-      // Simulate error
-      assertThrows(classOf[RuntimeException], () => {
-        val defaultConfig = logManager.currentDefaultConfig
-        logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
-      })
+      // Simulate Runtime error
+      assertThrows(classOf[RuntimeException], runLoadLogs)
       assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not 
have existed")
+      
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log 
dir should not turn offline when Runtime Exception thrown")
+
       // Do not simulate error on next call to LogManager#loadLogs. LogManager 
must understand that log had unclean shutdown the last time.
       simulateError.hasError = false
       cleanShutdownInterceptedValue = true
       val defaultConfig = logManager.currentDefaultConfig
       logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
       assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean 
shutdown flag")
+      logManager.shutdown()
+    }
+
+    locally {
+      val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+      val (logManager, runLoadLogs) = 
initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.IOException, 
logDirFailureChannel)
+
+      // Simulate IO error
+      assertDoesNotThrow(runLoadLogs, "IOException should be caught and 
handled")
+
+      
assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the 
log dir should turn offline after IOException thrown")
+      logManager.shutdown()
+    }
+
+    locally {

Review Comment:
   Could we avoid duplicating the code by iterating the same logic twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to