Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
soarez merged PR #15697: URL: https://github.com/apache/kafka/pull/15697 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
viktorsomogyi commented on PR #15697: URL: https://github.com/apache/kafka/pull/15697#issuecomment-2127130632 @soarez thanks for the info. I addressed your comment. Do you have anything more to add or are we good to go? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
soarez commented on code in PR #15697: URL: https://github.com/apache/kafka/pull/15697#discussion_r1593094798 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2466,7 +2467,6 @@ class ReplicaManager(val config: KafkaConfig, s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.") } // retrieve the UUID here because logManager.handleLogDirFailure handler removes it Review Comment: We should move or remove this comment now that the `uuid` declaration has been moved up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
viktorsomogyi commented on PR #15697: URL: https://github.com/apache/kafka/pull/15697#issuecomment-2098821932 @soarez at the end I chose the shortcut regarding detecting leaders before shutdown. The reason is complex as the solution that would be required for this is complex too. So on one part the sequence of events is problematic. First we update the `LogManager` and then try to propagate the event to the controller. At this point the metadata is stale so I can't use that for reliable information to detect whether partitions have leadership or not. A workaround would be to subtract the LogManager's data from metadata cache (ie. if there is only a single isr replica and that is the current, then we can accept it as offline in reality). I don't really feel that it is a robust solution, it could be prone to race conditions on the network depending on how requests come from the controller as long as it's alive. I think it's more robust to just fail if we can't contact the controller. The second reason is a bit technical and can be worked around, although requires lots of effort. When trying to extract which replica->logdir information from `LogManager`, my only available information regarding logdirs given by the event is the `Uuid`. Unfortunately `LogManager` doesn't store the `Uuid` of an offline dir (and besides I don't think `Uuid` and logdir names used consistently across the whole module). This problem can be solved by propagating both logdir and `Uuid` in the events or store offline dirs' `Uuid ` in `LogManager`. I think the latter is problematic because we can't know the point until we should store information about offline dirs as they might never come back. The first can be done, although could be a sizeable refactor and generally I felt that just choosing the simpler route now could be more robust. Let me know if you think we should try it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
viktorsomogyi commented on PR #15697: URL: https://github.com/apache/kafka/pull/15697#issuecomment-2064216189 Rebased on latest trunk as there were some conflicts. Addressed some of the comments but there are 2 things I need to investigate: * `LogDirFailureTest` fails in `@AfterAll` likely because an incorrect shutdown, perhaps there's a timing issue * Check if we can detect if there are any leaders before shutdown I'll update on both shortly, hopefully tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
viktorsomogyi commented on code in PR #15697: URL: https://github.com/apache/kafka/pull/15697#discussion_r1570907841 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -528,6 +529,10 @@ object KafkaConfig { "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." + val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully communicate to the controller that some log " + +"directory has failed for longer than this time, and there's at least one partition with leadership on that directory, " + Review Comment: I'll do another round with this, there might be a way in `BrokerServer` to extract this information using the combination of `MetadataCache`, `ReplicaManager` and `LogManager`. I'll update you tomorrow about my findings. ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -327,16 +333,25 @@ class BrokerLifecycleManager( private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event { override def run(): Unit = { if (offlineDirs.isEmpty) { -offlineDirs = Set(dir) +offlineDirs = Map(dir -> false) } else { -offlineDirs = offlineDirs + dir +offlineDirs += (dir -> false) } if (registered) { scheduleNextCommunicationImmediately() } } } + private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends EventQueue.Event { +override def run(): Unit = { + if (!offlineDirs.getOrElse(offlineDir, false)) { +error(s"Shutting down because couldn't communicate offline log dirs with controllers") Review Comment: I'll print the UUID here only and I'll modify other log statements to contain the UUID so one can pair these log statements when analyzing the logs. Printing the dir path here would be a little bit bigger stretch as we currently don't propagate it down to this level. Let me know if you think it'd be better to print the path here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
mimaison commented on code in PR #15697: URL: https://github.com/apache/kafka/pull/15697#discussion_r1565973156 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -870,6 +875,7 @@ object KafkaConfig { .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc) .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc) .define(LogMessageDownConversionEnableProp, BOOLEAN, LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, LogMessageDownConversionEnableDoc) + .define(LogDirFailureTimeoutMsProp, LONG, Defaults.LOG_DIR_FAILURE_TIMEOUT_MS, atLeast(0), MEDIUM, LogDirFailureTimeoutMsDoc) Review Comment: In the KIP the accepted value range is defined as >= 1. I wonder if values below 1s actually make much sense. Also the importance was defined as low. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
soarez commented on code in PR #15697: URL: https://github.com/apache/kafka/pull/15697#discussion_r1562612304 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -507,6 +522,7 @@ class BrokerLifecycleManager( if (errorCode == Errors.NONE) { val responseData = message.data() failedAttempts = 0 + offlineDirs = offlineDirs.map(kv => kv._1 -> true) Review Comment: I think this is incorrect. If a new failed directory is added to `offlineDirs` in-between a hearbeat request-resopnse, then we'll clear it here before knowing if it will propagated to the controller. One idea is to hand down the offline dirs set in the request in `sendBrokerHeartBeat()` to `BrokerHeartbeatResponseEvent` through `BrokerHeartbeatResponseHandler` as a new constructor argument. ## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ## @@ -94,6 +94,7 @@ public class Defaults { public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 6; public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1; public static final boolean AUTO_CREATE_TOPICS_ENABLE = true; +public static final long LOG_DIR_FAILURE_TIMEOUT_MS = 3L; Review Comment: This default seems reasonable to me. ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -211,7 +211,8 @@ class BrokerServer( time, s"broker-${config.nodeId}-", isZkBroker = false, -logDirs = logManager.directoryIdsSet) +logDirs = logManager.directoryIdsSet, +() => kafkaScheduler.schedule("shutdown", () => shutdown(), 0, -1)) Review Comment: There's a `scheduleOnce` alternative which sets `periodMs` to `-1`. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -528,6 +529,10 @@ object KafkaConfig { "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." + val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully communicate to the controller that some log " + +"directory has failed for longer than this time, and there's at least one partition with leadership on that directory, " + Review Comment: > and there's at least one partition with leadership We aren't checking for this condition. We can either a) implement it; or b) keep it simple and drop this out of the configuration description. ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -327,16 +333,25 @@ class BrokerLifecycleManager( private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event { override def run(): Unit = { if (offlineDirs.isEmpty) { -offlineDirs = Set(dir) +offlineDirs = Map(dir -> false) } else { -offlineDirs = offlineDirs + dir +offlineDirs += (dir -> false) } if (registered) { scheduleNextCommunicationImmediately() } } } + private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends EventQueue.Event { +override def run(): Unit = { + if (!offlineDirs.getOrElse(offlineDir, false)) { +error(s"Shutting down because couldn't communicate offline log dirs with controllers") Review Comment: We should include the directory in the error. It might also be helpful to resolve the directory ID to its path. Perhaps something like `dirIdToPath` in `AssignmentsManager` should be made available here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
viktorsomogyi commented on PR #15697: URL: https://github.com/apache/kafka/pull/15697#issuecomment-2049303060 Rebased it due to conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org