[GitHub] [kafka] junrao commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly
junrao commented on code in PR #12136: URL: https://github.com/apache/kafka/pull/12136#discussion_r886228739 ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -158,14 +158,29 @@ class ServerShutdownTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = { + def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = { createTopic(topic) shutdownBroker() config.logDirs.foreach { dirName => val partitionDir = new File(dirName, s"$topic-0") partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1)) } -verifyCleanShutdownAfterFailedStartup[KafkaStorageException] + +val expectedStatusCode = Some(1) +var receivedStatusCode = Option.empty[Int] Review Comment: Should receivedStatusCode be volatile since it's set in a different thread? ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -158,14 +158,29 @@ class ServerShutdownTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = { + def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = { createTopic(topic) shutdownBroker() config.logDirs.foreach { dirName => val partitionDir = new File(dirName, s"$topic-0") partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1)) } -verifyCleanShutdownAfterFailedStartup[KafkaStorageException] + +val expectedStatusCode = Some(1) +var receivedStatusCode = Option.empty[Int] +Exit.setHaltProcedure((statusCode, _) => { + receivedStatusCode = Some(statusCode) +}.asInstanceOf[Nothing]) + +try { + val recreateBrokerExec: Executable = () => recreateBroker(true) + // this startup should fail with no online log dir (due to corrupted log), and exit directly without throwing exception + assertDoesNotThrow(recreateBrokerExec) + // JVM should exit with status code 1 + assertEquals(expectedStatusCode, receivedStatusCode) +} finally { + Exit.resetExitProcedure() Review Comment: Should this be resetHaltProcedure? ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -158,14 +158,29 @@ class ServerShutdownTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = { + def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = { createTopic(topic) shutdownBroker() config.logDirs.foreach { dirName => val partitionDir = new File(dirName, s"$topic-0") partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1)) } -verifyCleanShutdownAfterFailedStartup[KafkaStorageException] + +val expectedStatusCode = Some(1) +var receivedStatusCode = Option.empty[Int] +Exit.setHaltProcedure((statusCode, _) => { + receivedStatusCode = Some(statusCode) +}.asInstanceOf[Nothing]) + +try { + val recreateBrokerExec: Executable = () => recreateBroker(true) + // this startup should fail with no online log dir (due to corrupted log), and exit directly without throwing exception + assertDoesNotThrow(recreateBrokerExec) + // JVM should exit with status code 1 + assertEquals(expectedStatusCode, receivedStatusCode) Review Comment: LogDirFailureHandler sets the exit flag but runs in a separate thread. Should we do this in a waitUntil() block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly
junrao commented on code in PR #12136: URL: https://github.com/apache/kafka/pull/12136#discussion_r880990528 ## core/src/test/scala/unit/kafka/log/LogLoaderTest.scala: ## @@ -158,22 +192,37 @@ class LogLoaderTest { } locally { + val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(logDirFailureChannel) simulateError.hasError = true - val logManager: LogManager = interceptedLogManager(logConfig, logDirs, simulateError) - log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None) + simulateError.errorType = ErrorTypes.RuntimeException - // Simulate error - assertThrows(classOf[RuntimeException], () => { -val defaultConfig = logManager.currentDefaultConfig -logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) - }) + // Simulate Runtime error Review Comment: Should this comment be moved to before line 196? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly
junrao commented on code in PR #12136: URL: https://github.com/apache/kafka/pull/12136#discussion_r879690436 ## core/src/test/scala/unit/kafka/log/LogLoaderTest.scala: ## @@ -134,10 +154,29 @@ class LogLoaderTest { } } +def initializeLogManagerForSimulatingErrorTest(hasError: Boolean = false, + errorType: ErrorTypes.Errors = null, + logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(logDirs.size) +): (LogManager, Executable) = { + simulateError.hasError = hasError Review Comment: We are changing simulateError after initializing LogManager. Perhaps it's clearer to just do that for the first test and avoid passing in hasError and errorType to initializeLogManagerForSimulatingErrorTest()? ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -376,7 +381,9 @@ class LogManager(logDirs: Seq[File], s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") } catch { case e: IOException => -offlineDirs.add((logDirAbsolutePath, e)) +handleIOException(logDirAbsolutePath, e) + case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] => +// KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache Review Comment: When converting IOException to KafkaStorageException, we already add an error log. So, there is probably no need to log it below again. Could we also add a comment why we can just let go of KafkaStorageException? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly
junrao commented on code in PR #12136: URL: https://github.com/apache/kafka/pull/12136#discussion_r871593948 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -376,8 +381,10 @@ class LogManager(logDirs: Seq[File], s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") } catch { case e: IOException => -offlineDirs.add((logDirAbsolutePath, e)) -error(s"Error while loading log dir $logDirAbsolutePath", e) +handleIOException(logDirAbsolutePath, e) + case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] => +// KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache +handleIOException(logDirAbsolutePath, e.getCause.asInstanceOf[IOException]) Review Comment: If we hit KafkaStorageException, that means the failed disk has already been reported to the logDirFailureChannel when KafkaStorageException was generated. So, we probably don't need to track it in offlineDirs again. ## core/src/test/scala/unit/kafka/log/LogLoaderTest.scala: ## @@ -158,22 +197,54 @@ class LogLoaderTest { } locally { - simulateError.hasError = true - val logManager: LogManager = interceptedLogManager(logConfig, logDirs, simulateError) - log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None) + val logDirFailureChannel = new LogDirFailureChannel(logDirs.size) + val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.RuntimeException, logDirFailureChannel) - // Simulate error - assertThrows(classOf[RuntimeException], () => { -val defaultConfig = logManager.currentDefaultConfig -logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) - }) + // Simulate Runtime error + assertThrows(classOf[RuntimeException], runLoadLogs) assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not have existed") + assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when Runtime Exception thrown") + // Do not simulate error on next call to LogManager#loadLogs. LogManager must understand that log had unclean shutdown the last time. simulateError.hasError = false cleanShutdownInterceptedValue = true val defaultConfig = logManager.currentDefaultConfig logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean shutdown flag") + logManager.shutdown() +} + +locally { + val logDirFailureChannel = new LogDirFailureChannel(logDirs.size) + val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.IOException, logDirFailureChannel) + + // Simulate IO error + assertDoesNotThrow(runLoadLogs, "IOException should be caught and handled") + + assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the log dir should turn offline after IOException thrown") + logManager.shutdown() +} + +locally { Review Comment: Could we avoid duplicating the code by iterating the same logic twice? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org