Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-12 Thread via GitHub


cmccabe merged PR #15648:
URL: https://github.com/apache/kafka/pull/15648


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-11 Thread via GitHub


mumrah commented on PR #15648:
URL: https://github.com/apache/kafka/pull/15648#issuecomment-2050028734

   Thanks for taking a look @chia7712. I've updated the PR with your 
suggestions and added a few more test cases. 
   
   cc @soarez and @jsancio in case you'd like to take another pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549840889


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -177,6 +201,78 @@ class RaftManagerTest {
 assertFalse(fileLocked(lockPath))
   }
 
+  @Test
+  def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
+val logDir = Some(TestUtils.tempDir().toPath)
+val metadataDir = Some(TestUtils.tempDir().toPath)
+val nodeId = 1
+val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir, 
metadataDir)
+val raftManager = createRaftManager(
+  new TopicPartition("__cluster_metadata", 0),
+  config
+)
+raftManager.shutdown()
+
+try {
+  KafkaRaftManager.maybeDeleteMetadataLogDir(config)
+  
assertFalse(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+} catch {
+  case err: Throwable => fail("Failed to delete metadata log", err)
+}
+assertTrue(Files.exists(metadataDir.get))
+  }
+
+  @Test
+  def testNonMigratingZkBrokerDeletesMetadataLog(): Unit = {
+val logDir = Some(TestUtils.tempDir().toPath)
+val metadataDir = Some(TestUtils.tempDir().toPath)
+val nodeId = 1
+// Use this config to create the directory
+val config1 = createZkBrokerConfig(migrationEnabled = true, nodeId, 
logDir, metadataDir)
+val raftManager = createRaftManager(
+  new TopicPartition("__cluster_metadata", 0),
+  config1
+)
+raftManager.shutdown()
+
+val config2 = createZkBrokerConfig(migrationEnabled = false, nodeId, 
logDir, metadataDir)
+try {
+  KafkaRaftManager.maybeDeleteMetadataLogDir(config2)
+  fail("Should have not deleted the metadata log")
+} catch {
+  case err: Throwable =>
+assertEquals("Not deleting metadata log dir since migrations are not 
enabled.", err.getMessage)
+
assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+}
+assertTrue(Files.exists(metadataDir.get))
+  }
+
+  @Test
+  def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = {
+val logDir = Some(TestUtils.tempDir().toPath)
+val metadataDir = Some(TestUtils.tempDir().toPath)
+val nodeId = 1
+val config = createConfig(
+  Set(ProcessRole.BrokerRole),
+  nodeId,
+  logDir,
+  metadataDir
+)
+val raftManager = createRaftManager(
+  new TopicPartition("__cluster_metadata", 0),
+  config
+)
+raftManager.shutdown()
+
+try {

Review Comment:
   we can use `assertThrow` to simplify the code. for example:
   ```scala
   assertThrows(classOf[RuntimeException], () => 
KafkaRaftManager.maybeDeleteMetadataLogDir(config))
   assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
   assertTrue(Files.exists(metadataDir.get))
   ```



##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +70,51 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Test if the configured metadata log dir is one of the data log dirs.
+   */
+  def hasDifferentLogDir(config: KafkaConfig): Boolean = {
+!config
+  .logDirs
+  .map(Paths.get(_).toAbsolutePath)
+  .contains(Paths.get(config.metadataLogDir).toAbsolutePath)
+  }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Unit = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  throw new RuntimeException("Not deleting metadata log dir since this 
node is in KRaft mode.")
+} else if (!config.migrationEnabled) {
+  throw new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled.")
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val logDirName = 
UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION)
+  val metadataPartitionDir = KafkaRaftManager.createLogDirectory(new 
File(config.metadataLogDir), logDirName)

Review Comment:
   `new File(config.metadataLogDir)` can be replaced by `metadataDir`



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -480,6 +480,81 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 

Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on PR #15648:
URL: https://github.com/apache/kafka/pull/15648#issuecomment-2034709602

   I'm going to work on a ducktape test as well. Hopefully I can get that done 
today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549818474


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -420,6 +420,12 @@ class KafkaServer(
 isZkBroker = true,
 logManager.directoryIdsSet)
 
+  // For ZK brokers in migration mode, always delete the metadata 
partition on startup.
+  KafkaRaftManager.maybeDeleteMetadataLogDir(config) match {
+case Some(err) => logger.error("Could not delete local metadata 
log dir. This is non-fatal, so continuing with startup.", err)

Review Comment:
   I updated this to let maybeDeleteMetadataLogDir throw and fail startup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549817561


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +69,36 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   * @returnAn error wrapped as an Option, if an error occurred. None 
otherwise
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  Some(new RuntimeException("Not deleting metadata log dir since this node 
is in KRaft mode."))
+} else if (!config.migrationEnabled) {
+  Some(new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled."))
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+  try {
+Utils.delete(metadataDir)

Review Comment:
   Ok, code has been updated to just delete the `__cluster_metadata-0` 
directory. I got confused by our naming  
   
   metadataLogDir is actually the directory in which the metadata log (which is 
a directory) exists :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549817561


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +69,36 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   * @returnAn error wrapped as an Option, if an error occurred. None 
otherwise
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  Some(new RuntimeException("Not deleting metadata log dir since this node 
is in KRaft mode."))
+} else if (!config.migrationEnabled) {
+  Some(new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled."))
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+  try {
+Utils.delete(metadataDir)

Review Comment:
   Ok, code has been updated to just delete the `__cluster_metadata-0` 
directory. I got confused by our naming.. metadataLogDir is actually the 
directory in which the metadata log (which is a directory) exists  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549813667


##
core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala:
##
@@ -389,7 +389,12 @@ class NodeToControllerRequestThread(
   debug("Controller isn't cached, looking for local metadata changes")
   controllerInformation.node match {
 case Some(controllerNode) =>
-  info(s"Recorded new controller, from now on will use node 
$controllerNode")
+  val controllerType = if (controllerInformation.isZkController) {
+"ZK"
+  } else {
+"KRaft"
+  }
+  info(s"Recorded new $controllerType controller, from now on will use 
node $controllerNode")

Review Comment:
   Unrelated change, but helped when debugging the integration test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549786977


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -420,6 +420,12 @@ class KafkaServer(
 isZkBroker = true,
 logManager.directoryIdsSet)
 
+  // For ZK brokers in migration mode, always delete the metadata 
partition on startup.
+  KafkaRaftManager.maybeDeleteMetadataLogDir(config) match {
+case Some(err) => logger.error("Could not delete local metadata 
log dir. This is non-fatal, so continuing with startup.", err)

Review Comment:
   My rationale here was that the deletion is not strictly required, but rather 
an optimization for the revert-to-ZK case. 
   
   I assume RaftManager would also fail if there was some underlying I/O 
problem, but failing here is probably okay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


soarez commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549319336


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -420,6 +420,12 @@ class KafkaServer(
 isZkBroker = true,
 logManager.directoryIdsSet)
 
+  // For ZK brokers in migration mode, always delete the metadata 
partition on startup.
+  KafkaRaftManager.maybeDeleteMetadataLogDir(config) match {
+case Some(err) => logger.error("Could not delete local metadata 
log dir. This is non-fatal, so continuing with startup.", err)

Review Comment:
   Should this really be non-fatal? What's the thinking behind this decision?
   If there is an IO failure on the metadata log dir the broker should not 
continue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-02 Thread via GitHub


cmccabe commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1548528194


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +69,36 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   * @returnAn error wrapped as an Option, if an error occurred. None 
otherwise
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  Some(new RuntimeException("Not deleting metadata log dir since this node 
is in KRaft mode."))
+} else if (!config.migrationEnabled) {
+  Some(new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled."))
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+  try {
+Utils.delete(metadataDir)

Review Comment:
   yes, we need to make sure not to delete the whole metadata directory  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-02 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1548499611


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +69,36 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   * @returnAn error wrapped as an Option, if an error occurred. None 
otherwise
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  Some(new RuntimeException("Not deleting metadata log dir since this node 
is in KRaft mode."))
+} else if (!config.migrationEnabled) {
+  Some(new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled."))
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+  try {
+Utils.delete(metadataDir)

Review Comment:
   Thanks @jsancio, I'll fix this and add some additional test cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-02 Thread via GitHub


jsancio commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1548472966


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +69,36 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   * @returnAn error wrapped as an Option, if an error occurred. None 
otherwise
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  Some(new RuntimeException("Not deleting metadata log dir since this node 
is in KRaft mode."))
+} else if (!config.migrationEnabled) {
+  Some(new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled."))
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+  try {
+Utils.delete(metadataDir)

Review Comment:
   This deletes the entire metadata log directory and not the 
`__cluster_metadata-0` topic partition in the metadata log dir. In some 
configuration the `metadata.log.dir` equals the `log.dir(s)`. In those 
configuration this will delete all of the topic partitions in the log directory.
   
   If the test pass, this means that we are missing a test that checks this 
doesn't happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org