[GitHub] [kafka] junrao commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly

2022-05-31 Thread GitBox


junrao commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r886228739


##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -158,14 +158,29 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): 
Unit = {
+  def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): 
Unit = {
 createTopic(topic)
 shutdownBroker()
 config.logDirs.foreach { dirName =>
   val partitionDir = new File(dirName, s"$topic-0")
   partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, 
TestUtils.random.nextInt(1024) + 1))
 }
-verifyCleanShutdownAfterFailedStartup[KafkaStorageException]
+
+val expectedStatusCode = Some(1)
+var receivedStatusCode = Option.empty[Int]

Review Comment:
   Should receivedStatusCode be volatile since it's set in a different thread?



##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -158,14 +158,29 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): 
Unit = {
+  def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): 
Unit = {
 createTopic(topic)
 shutdownBroker()
 config.logDirs.foreach { dirName =>
   val partitionDir = new File(dirName, s"$topic-0")
   partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, 
TestUtils.random.nextInt(1024) + 1))
 }
-verifyCleanShutdownAfterFailedStartup[KafkaStorageException]
+
+val expectedStatusCode = Some(1)
+var receivedStatusCode = Option.empty[Int]
+Exit.setHaltProcedure((statusCode, _) => {
+  receivedStatusCode = Some(statusCode)
+}.asInstanceOf[Nothing])
+
+try {
+  val recreateBrokerExec: Executable = () => recreateBroker(true)
+  // this startup should fail with no online log dir (due to corrupted 
log), and exit directly without throwing exception
+  assertDoesNotThrow(recreateBrokerExec)
+  // JVM should exit with status code 1
+  assertEquals(expectedStatusCode, receivedStatusCode)
+} finally {
+  Exit.resetExitProcedure()

Review Comment:
   Should this be resetHaltProcedure?



##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -158,14 +158,29 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): 
Unit = {
+  def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): 
Unit = {
 createTopic(topic)
 shutdownBroker()
 config.logDirs.foreach { dirName =>
   val partitionDir = new File(dirName, s"$topic-0")
   partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, 
TestUtils.random.nextInt(1024) + 1))
 }
-verifyCleanShutdownAfterFailedStartup[KafkaStorageException]
+
+val expectedStatusCode = Some(1)
+var receivedStatusCode = Option.empty[Int]
+Exit.setHaltProcedure((statusCode, _) => {
+  receivedStatusCode = Some(statusCode)
+}.asInstanceOf[Nothing])
+
+try {
+  val recreateBrokerExec: Executable = () => recreateBroker(true)
+  // this startup should fail with no online log dir (due to corrupted 
log), and exit directly without throwing exception
+  assertDoesNotThrow(recreateBrokerExec)
+  // JVM should exit with status code 1
+  assertEquals(expectedStatusCode, receivedStatusCode)

Review Comment:
   LogDirFailureHandler sets the exit flag but runs in a separate thread. 
Should we do this in a waitUntil() block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly

2022-05-24 Thread GitBox


junrao commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r880990528


##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -158,22 +192,37 @@ class LogLoaderTest {
 }
 
 locally {
+  val (logManager, runLoadLogs) = 
initializeLogManagerForSimulatingErrorTest(logDirFailureChannel)
   simulateError.hasError = true
-  val logManager: LogManager = interceptedLogManager(logConfig, logDirs, 
simulateError)
-  log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = 
None)
+  simulateError.errorType = ErrorTypes.RuntimeException
 
-  // Simulate error
-  assertThrows(classOf[RuntimeException], () => {
-val defaultConfig = logManager.currentDefaultConfig
-logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
-  })
+  // Simulate Runtime error

Review Comment:
   Should this comment be moved to before line 196?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly

2022-05-23 Thread GitBox


junrao commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r879690436


##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -134,10 +154,29 @@ class LogLoaderTest {
   }
 }
 
+def initializeLogManagerForSimulatingErrorTest(hasError: Boolean = false,
+ errorType: ErrorTypes.Errors 
= null,
+ logDirFailureChannel: 
LogDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+): (LogManager, Executable) = {
+  simulateError.hasError = hasError

Review Comment:
   We are changing simulateError after initializing LogManager. Perhaps it's 
clearer to just do that for the first test and avoid passing in hasError and 
errorType to initializeLogManagerForSimulatingErrorTest()?



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -376,7 +381,9 @@ class LogManager(logDirs: Seq[File],
 s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
 } catch {
   case e: IOException =>
-offlineDirs.add((logDirAbsolutePath, e))
+handleIOException(logDirAbsolutePath, e)
+  case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
+// KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache

Review Comment:
   When converting IOException to KafkaStorageException, we already add an 
error log. So, there is probably no need to log it below again. Could we also 
add a comment why we can just let go of KafkaStorageException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly

2022-05-12 Thread GitBox


junrao commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r871593948


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -376,8 +381,10 @@ class LogManager(logDirs: Seq[File],
 s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
 } catch {
   case e: IOException =>
-offlineDirs.add((logDirAbsolutePath, e))
-error(s"Error while loading log dir $logDirAbsolutePath", e)
+handleIOException(logDirAbsolutePath, e)
+  case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
+// KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache
+handleIOException(logDirAbsolutePath, 
e.getCause.asInstanceOf[IOException])

Review Comment:
   If we hit KafkaStorageException, that means the failed disk has already been 
reported to the logDirFailureChannel when KafkaStorageException was generated. 
So, we probably don't need to track it in offlineDirs again.



##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -158,22 +197,54 @@ class LogLoaderTest {
 }
 
 locally {
-  simulateError.hasError = true
-  val logManager: LogManager = interceptedLogManager(logConfig, logDirs, 
simulateError)
-  log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = 
None)
+  val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+  val (logManager, runLoadLogs) = 
initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.RuntimeException, 
logDirFailureChannel)
 
-  // Simulate error
-  assertThrows(classOf[RuntimeException], () => {
-val defaultConfig = logManager.currentDefaultConfig
-logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
-  })
+  // Simulate Runtime error
+  assertThrows(classOf[RuntimeException], runLoadLogs)
   assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not 
have existed")
+  
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log 
dir should not turn offline when Runtime Exception thrown")
+
   // Do not simulate error on next call to LogManager#loadLogs. LogManager 
must understand that log had unclean shutdown the last time.
   simulateError.hasError = false
   cleanShutdownInterceptedValue = true
   val defaultConfig = logManager.currentDefaultConfig
   logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
   assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean 
shutdown flag")
+  logManager.shutdown()
+}
+
+locally {
+  val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+  val (logManager, runLoadLogs) = 
initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.IOException, 
logDirFailureChannel)
+
+  // Simulate IO error
+  assertDoesNotThrow(runLoadLogs, "IOException should be caught and 
handled")
+
+  
assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the 
log dir should turn offline after IOException thrown")
+  logManager.shutdown()
+}
+
+locally {

Review Comment:
   Could we avoid duplicating the code by iterating the same logic twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org