[ https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Chen updated KAFKA-16814: ------------------------------ Description: When starting up kafka logManager, we'll check stray replicas to avoid some corner cases. But this check might cause broker unable to startup if `partition.metadata` is missing because when startup kafka, we load log from file, and the topicId of the log is coming from `partition.metadata` file. So, if `partition.metadata` is missing, the topicId will be None, and the `LogManager#isStrayKraftReplica` will fail with no topicID error. The `partition.metadata` missing could be some storage failure, or another possible path is unclean shutdown after topic is created in the replica, but before data is flushed into `partition.metadata` file. This is possible because we do the flush in async way [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229]. {code:java} ERROR Encountered fatal fault: Error starting LogManager (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) java.lang.RuntimeException: The log dir Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=0) does not have a topic ID, which is not allowed when running in KRaft mode. at kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609) at scala.Option.getOrElse(Option.scala:201) at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608) at kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294) at kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294) at kafka.log.LogManager.loadLog(LogManager.scala:359) at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1623) {code} Because if we don't do the isStrayKraftReplica check, the topicID and the `partition.metadata` will get recovered after getting topic partition update and becoming leader or follower later. I'm proposing we skip the `isStrayKraftReplica` check if topicID is None, instead of throwing exception to terminate the kafka. `isStrayKraftReplica` check is just for a corner case only, it should be fine IMO. === update === Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and delete them is because the replica should be deleted, but left in the log dir. So, if we have a replica that doesn't have topicID (due to was: When starting up kafka logManager, we'll check stray replicas to avoid some corner cases. But this check might cause broker unable to startup if `partition.metadata` is missing because when startup kafka, we load log from file, and the topicId of the log is coming from `partition.metadata` file. So, if `partition.metadata` is missing, the topicId will be None, and the `LogManager#isStrayKraftReplica` will fail with no topicID error. The `partition.metadata` missing could be some storage failure, or another possible path is unclean shutdown after topic is created in the replica, but before data is flushed into `partition.metadata` file. This is possible because we do the flush in async way [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229]. {code:java} ERROR Encountered fatal fault: Error starting LogManager (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) java.lang.RuntimeException: The log dir Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=0) does not have a topic ID, which is not allowed when running in KRaft mode. at kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609) at scala.Option.getOrElse(Option.scala:201) at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608) at kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294) at kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294) at kafka.log.LogManager.loadLog(LogManager.scala:359) at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1623) {code} Because if we don't do the isStrayKraftReplica check, the topicID and the `partition.metadata` will get recovered after getting topic partition update and becoming leader or follower later. I'm proposing we skip the `isStrayKraftReplica` check if topicID is None, instead of throwing exception to terminate the kafka. `isStrayKraftReplica` check is just for a corner case only, it should be fine IMO. > KRaft broker cannot startup when `partition.metadata` is missing > ---------------------------------------------------------------- > > Key: KAFKA-16814 > URL: https://issues.apache.org/jira/browse/KAFKA-16814 > Project: Kafka > Issue Type: Bug > Affects Versions: 3.7.0 > Reporter: Luke Chen > Priority: Major > > When starting up kafka logManager, we'll check stray replicas to avoid some > corner cases. But this check might cause broker unable to startup if > `partition.metadata` is missing because when startup kafka, we load log from > file, and the topicId of the log is coming from `partition.metadata` file. > So, if `partition.metadata` is missing, the topicId will be None, and the > `LogManager#isStrayKraftReplica` will fail with no topicID error. > The `partition.metadata` missing could be some storage failure, or another > possible path is unclean shutdown after topic is created in the replica, but > before data is flushed into `partition.metadata` file. This is possible > because we do the flush in async way > [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229]. > > > {code:java} > ERROR Encountered fatal fault: Error starting LogManager > (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) > java.lang.RuntimeException: The log dir > Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, > partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, > logEndOffset=0) does not have a topic ID, which is not allowed when running > in KRaft mode. > at > kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609) > at scala.Option.getOrElse(Option.scala:201) > at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608) > at > kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294) > at > kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294) > at kafka.log.LogManager.loadLog(LogManager.scala:359) > at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) > at java.base/java.lang.Thread.run(Thread.java:1623) {code} > > Because if we don't do the isStrayKraftReplica check, the topicID and the > `partition.metadata` will get recovered after getting topic partition update > and becoming leader or follower later. I'm proposing we skip the > `isStrayKraftReplica` check if topicID is None, instead of throwing exception > to terminate the kafka. `isStrayKraftReplica` check is just for a corner case > only, it should be fine IMO. > > > === update === > Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and > delete them is because the replica should be deleted, but left in the log > dir. So, if we have a replica that doesn't have topicID (due to > > -- This message was sent by Atlassian Jira (v8.20.10#820010)