soarez commented on code in PR #15697:
URL: https://github.com/apache/kafka/pull/15697#discussion_r1562612304


##########
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##########
@@ -507,6 +522,7 @@ class BrokerLifecycleManager(
         if (errorCode == Errors.NONE) {
           val responseData = message.data()
           failedAttempts = 0
+          offlineDirs = offlineDirs.map(kv => kv._1 -> true)

Review Comment:
   I think this is incorrect. If a new failed directory is added to 
`offlineDirs` in-between a hearbeat request-resopnse, then we'll clear it here 
before knowing if it will propagated to the controller.
   
   One idea is to hand down the offline dirs set in the request in 
`sendBrokerHeartBeat()` to `BrokerHeartbeatResponseEvent` through 
`BrokerHeartbeatResponseHandler` as a new constructor argument.



##########
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##########
@@ -94,6 +94,7 @@ public class Defaults {
     public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 
60000;
     public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1;
     public static final boolean AUTO_CREATE_TOPICS_ENABLE = true;
+    public static final long LOG_DIR_FAILURE_TIMEOUT_MS = 30000L;

Review Comment:
   This default seems reasonable to me.



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -211,7 +211,8 @@ class BrokerServer(
         time,
         s"broker-${config.nodeId}-",
         isZkBroker = false,
-        logDirs = logManager.directoryIdsSet)
+        logDirs = logManager.directoryIdsSet,
+        () => kafkaScheduler.schedule("shutdown", () => shutdown(), 0, -1))

Review Comment:
   There's a `scheduleOnce` alternative which sets `periodMs` to `-1`.



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -528,6 +529,10 @@ object KafkaConfig {
     "If log.message.timestamp.type=CreateTime, the message will be rejected if 
the difference in timestamps exceeds " +
     "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime."
 
+  val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully 
communicate to the controller that some log " +
+    "directory has failed for longer than this time, and there's at least one 
partition with leadership on that directory, " +

Review Comment:
   > and there's at least one partition with leadership
   
   We aren't checking for this condition. We can either a) implement it; or b) 
keep it simple and drop this out of the configuration description.



##########
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##########
@@ -327,16 +333,25 @@ class BrokerLifecycleManager(
   private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
     override def run(): Unit = {
       if (offlineDirs.isEmpty) {
-        offlineDirs = Set(dir)
+        offlineDirs = Map(dir -> false)
       } else {
-        offlineDirs = offlineDirs + dir
+        offlineDirs += (dir -> false)
       }
       if (registered) {
         scheduleNextCommunicationImmediately()
       }
     }
   }
 
+  private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends 
EventQueue.Event {
+    override def run(): Unit = {
+      if (!offlineDirs.getOrElse(offlineDir, false)) {
+        error(s"Shutting down because couldn't communicate offline log dirs 
with controllers")

Review Comment:
   We should include the directory in the error. It might also be helpful to 
resolve the directory ID to its path. Perhaps something like `dirIdToPath` in 
`AssignmentsManager` should be made available here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to