soarez commented on code in PR #15697: URL: https://github.com/apache/kafka/pull/15697#discussion_r1562612304
########## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ########## @@ -507,6 +522,7 @@ class BrokerLifecycleManager( if (errorCode == Errors.NONE) { val responseData = message.data() failedAttempts = 0 + offlineDirs = offlineDirs.map(kv => kv._1 -> true) Review Comment: I think this is incorrect. If a new failed directory is added to `offlineDirs` in-between a hearbeat request-resopnse, then we'll clear it here before knowing if it will propagated to the controller. One idea is to hand down the offline dirs set in the request in `sendBrokerHeartBeat()` to `BrokerHeartbeatResponseEvent` through `BrokerHeartbeatResponseHandler` as a new constructor argument. ########## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ########## @@ -94,6 +94,7 @@ public class Defaults { public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 60000; public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1; public static final boolean AUTO_CREATE_TOPICS_ENABLE = true; + public static final long LOG_DIR_FAILURE_TIMEOUT_MS = 30000L; Review Comment: This default seems reasonable to me. ########## core/src/main/scala/kafka/server/BrokerServer.scala: ########## @@ -211,7 +211,8 @@ class BrokerServer( time, s"broker-${config.nodeId}-", isZkBroker = false, - logDirs = logManager.directoryIdsSet) + logDirs = logManager.directoryIdsSet, + () => kafkaScheduler.schedule("shutdown", () => shutdown(), 0, -1)) Review Comment: There's a `scheduleOnce` alternative which sets `periodMs` to `-1`. ########## core/src/main/scala/kafka/server/KafkaConfig.scala: ########## @@ -528,6 +529,10 @@ object KafkaConfig { "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." + val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully communicate to the controller that some log " + + "directory has failed for longer than this time, and there's at least one partition with leadership on that directory, " + Review Comment: > and there's at least one partition with leadership We aren't checking for this condition. We can either a) implement it; or b) keep it simple and drop this out of the configuration description. ########## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ########## @@ -327,16 +333,25 @@ class BrokerLifecycleManager( private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event { override def run(): Unit = { if (offlineDirs.isEmpty) { - offlineDirs = Set(dir) + offlineDirs = Map(dir -> false) } else { - offlineDirs = offlineDirs + dir + offlineDirs += (dir -> false) } if (registered) { scheduleNextCommunicationImmediately() } } } + private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends EventQueue.Event { + override def run(): Unit = { + if (!offlineDirs.getOrElse(offlineDir, false)) { + error(s"Shutting down because couldn't communicate offline log dirs with controllers") Review Comment: We should include the directory in the error. It might also be helpful to resolve the directory ID to its path. Perhaps something like `dirIdToPath` in `AssignmentsManager` should be made available here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org