mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549817561
########## core/src/main/scala/kafka/raft/RaftManager.scala: ########## @@ -69,6 +69,36 @@ object KafkaRaftManager { lock } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + * @return An error wrapped as an Option, if an error occurred. None otherwise + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = { + // These constraints are enforced in KafkaServer, but repeating them here to guard against future callers + if (config.processRoles.nonEmpty) { + Some(new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")) + } else if (!config.migrationEnabled) { + Some(new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")) + } else { + val metadataDir = new File(config.metadataLogDir) + val deletionLock = KafkaRaftManager.lockDataDir(metadataDir) + try { + Utils.delete(metadataDir) Review Comment: Ok, code has been updated to just delete the `__cluster_metadata-0` directory. I got confused by our naming 😅 metadataLogDir is actually the directory in which the metadata log (which is a directory) exists :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org