jsancio commented on code in PR #19762: URL: https://github.com/apache/kafka/pull/19762#discussion_r2190622591
########## metadata/src/main/java/org/apache/kafka/image/TopicsImage.java: ########## @@ -119,4 +127,40 @@ public Map<Uuid, String> topicIdToNameView() { public String toString() { return new TopicsImageByNameNode(this).stringify(); } + + /** + * Returns true if the given topic partition should not be on the current broker according to the metadata image. + * + * @param newTopicsImage The new topics image after broker has been reloaded + * @param brokerId The ID of the current broker. + * @param topicId The topic ID + * @param partitionId The partition ID + * @param log The log + * @return true if the topic partition should not exist on the broker, false otherwise. + */ + public static boolean isStrayReplica(TopicsImage newTopicsImage, int brokerId, Optional<Uuid> topicId, int partitionId, String log) { Review Comment: Why make this static if the first argument is `TopicsImage`? This looks a lot like an object method where the `this` reference is the `TopicsImage`. ########## metadata/src/main/java/org/apache/kafka/image/TopicsImage.java: ########## @@ -119,4 +127,40 @@ public Map<Uuid, String> topicIdToNameView() { public String toString() { return new TopicsImageByNameNode(this).stringify(); } + + /** + * Returns true if the given topic partition should not be on the current broker according to the metadata image. + * + * @param newTopicsImage The new topics image after broker has been reloaded + * @param brokerId The ID of the current broker. + * @param topicId The topic ID + * @param partitionId The partition ID + * @param log The log + * @return true if the topic partition should not exist on the broker, false otherwise. + */ + public static boolean isStrayReplica(TopicsImage newTopicsImage, int brokerId, Optional<Uuid> topicId, int partitionId, String log) { + if (topicId.isEmpty()) { + // Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing + // data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always + // flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as + // a stray log. + LOG.info("The topicId does not exist in {}, treat it as a stray log.", log); + return true; + } + + PartitionRegistration partition = newTopicsImage.getPartition(topicId.get(), partitionId); + if (partition == null) { + LOG.info("Found stray log dir {}: the topicId {} does not exist in the metadata image.", log, topicId); + return true; + } else { + List<Integer> replicas = Arrays.stream(partition.replicas).boxed().toList(); + if (!replicas.contains(brokerId)) { + LOG.info("Found stray log dir {}: the current replica assignment {} does not contain the local brokerId {}.", + log, replicas.stream().map(String::valueOf).collect(Collectors.joining(", ", "[", "]")), brokerId); + return true; + } else { + return false; + } + } + } Review Comment: To me this should be a functionality of the log manager. Maybe the TopicsImage method should just return all of the topic ids that don't exist in the given image and broker. This would allow you to remove that added static logger. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftLog.java: ########## Review Comment: I am thinking that we should make this internal by moving the implementation to `o.a.k.r.internals.KafkaRaftLog.java`. The same of the accompanying test suite file. ########## metadata/src/main/java/org/apache/kafka/image/TopicsImage.java: ########## @@ -119,4 +127,40 @@ public Map<Uuid, String> topicIdToNameView() { public String toString() { return new TopicsImageByNameNode(this).stringify(); } + + /** + * Returns true if the given topic partition should not be on the current broker according to the metadata image. + * + * @param newTopicsImage The new topics image after broker has been reloaded + * @param brokerId The ID of the current broker. + * @param topicId The topic ID + * @param partitionId The partition ID + * @param log The log + * @return true if the topic partition should not exist on the broker, false otherwise. + */ + public static boolean isStrayReplica(TopicsImage newTopicsImage, int brokerId, Optional<Uuid> topicId, int partitionId, String log) { + if (topicId.isEmpty()) { + // Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing + // data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always + // flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as + // a stray log. + LOG.info("The topicId does not exist in {}, treat it as a stray log.", log); + return true; + } Review Comment: How about letting the log manager make this decision? In Kafka 4.0 all topics must have a topic id. ########## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ########## @@ -340,7 +339,7 @@ class BrokerMetadataPublisher( // recovery-from-unclean-shutdown if required. logManager.startup( metadataCache.getAllTopics().asScala, - isStray = log => JLogManager.isStrayKraftReplica(brokerId, newImage.topics(), log) + isStray = log => TopicsImage.isStrayReplica(newImage.topics(), brokerId, log.topicId(), log.topicPartition().partition(), log.toString) Review Comment: Minor but to me stray partition are in the log manager not in the topics image. Meaning the log manager has partition entries that are not in the latest topics image. In some sense the log manager understand topics image and makes sure that they match. The topics images doesn't know anything about "stray partitions" and the log manager. If you still want to move the functionality TopicsImage maybe make it a method (not static) with `Stream<TopicIdPartition> deletedPartitionsForReplica(int brokerId, Stream<TopicIdPartition>)`. ########## checkstyle/import-control.xml: ########## @@ -486,6 +486,7 @@ <allow class="org.apache.kafka.common.compress.Compression" exact-match="true" /> <allow pkg="org.apache.kafka.common.config" /> <allow pkg="org.apache.kafka.common.feature" /> + <allow pkg="org.apache.kafka.common.internals" /> Review Comment: Note that to me this means that the common module is not organized correctly if the raft module needs types in the "internals" namespace. Same comment applies to the change below which also includes `o.a.k.s.internals.log`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org