jsancio commented on code in PR #19762:
URL: https://github.com/apache/kafka/pull/19762#discussion_r2190622591


##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -119,4 +127,40 @@ public Map<Uuid, String> topicIdToNameView() {
     public String toString() {
         return new TopicsImageByNameNode(this).stringify();
     }
+
+    /**
+     * Returns true if the given topic partition should not be on the current 
broker according to the metadata image.
+     *
+     * @param newTopicsImage The new topics image after broker has been 
reloaded
+     * @param brokerId       The ID of the current broker.
+     * @param topicId        The topic ID
+     * @param partitionId    The partition ID
+     * @param log            The log
+     * @return true if the topic partition should not exist on the broker, 
false otherwise.
+     */
+    public static boolean isStrayReplica(TopicsImage newTopicsImage, int 
brokerId, Optional<Uuid> topicId, int partitionId, String log) {

Review Comment:
   Why make this static if the first argument is `TopicsImage`? This looks a 
lot like an object method where the `this` reference is the `TopicsImage`.



##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -119,4 +127,40 @@ public Map<Uuid, String> topicIdToNameView() {
     public String toString() {
         return new TopicsImageByNameNode(this).stringify();
     }
+
+    /**
+     * Returns true if the given topic partition should not be on the current 
broker according to the metadata image.
+     *
+     * @param newTopicsImage The new topics image after broker has been 
reloaded
+     * @param brokerId       The ID of the current broker.
+     * @param topicId        The topic ID
+     * @param partitionId    The partition ID
+     * @param log            The log
+     * @return true if the topic partition should not exist on the broker, 
false otherwise.
+     */
+    public static boolean isStrayReplica(TopicsImage newTopicsImage, int 
brokerId, Optional<Uuid> topicId, int partitionId, String log) {
+        if (topicId.isEmpty()) {
+            // Missing topic ID could result from storage failure or unclean 
shutdown after topic creation but before flushing
+            // data to the `partition.metadata` file. And before appending 
data to the log, the `partition.metadata` is always
+            // flushed to disk. So if the topic ID is missing, it mostly means 
no data was appended, and we can treat this as
+            // a stray log.
+            LOG.info("The topicId does not exist in {}, treat it as a stray 
log.", log);
+            return true;
+        }
+
+        PartitionRegistration partition = 
newTopicsImage.getPartition(topicId.get(), partitionId);
+        if (partition == null) {
+            LOG.info("Found stray log dir {}: the topicId {} does not exist in 
the metadata image.", log, topicId);
+            return true;
+        } else {
+            List<Integer> replicas = 
Arrays.stream(partition.replicas).boxed().toList();
+            if (!replicas.contains(brokerId)) {
+                LOG.info("Found stray log dir {}: the current replica 
assignment {} does not contain the local brokerId {}.",
+                        log, 
replicas.stream().map(String::valueOf).collect(Collectors.joining(", ", "[", 
"]")), brokerId);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }

Review Comment:
   To me this should be a functionality of the log manager. Maybe the 
TopicsImage method should just return all of the topic ids that don't exist in 
the given image and broker.
   
   This would allow you to remove that added static logger.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftLog.java:
##########


Review Comment:
   I am thinking that we should make this internal by moving the implementation 
to `o.a.k.r.internals.KafkaRaftLog.java`. The same of the accompanying test 
suite file.



##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -119,4 +127,40 @@ public Map<Uuid, String> topicIdToNameView() {
     public String toString() {
         return new TopicsImageByNameNode(this).stringify();
     }
+
+    /**
+     * Returns true if the given topic partition should not be on the current 
broker according to the metadata image.
+     *
+     * @param newTopicsImage The new topics image after broker has been 
reloaded
+     * @param brokerId       The ID of the current broker.
+     * @param topicId        The topic ID
+     * @param partitionId    The partition ID
+     * @param log            The log
+     * @return true if the topic partition should not exist on the broker, 
false otherwise.
+     */
+    public static boolean isStrayReplica(TopicsImage newTopicsImage, int 
brokerId, Optional<Uuid> topicId, int partitionId, String log) {
+        if (topicId.isEmpty()) {
+            // Missing topic ID could result from storage failure or unclean 
shutdown after topic creation but before flushing
+            // data to the `partition.metadata` file. And before appending 
data to the log, the `partition.metadata` is always
+            // flushed to disk. So if the topic ID is missing, it mostly means 
no data was appended, and we can treat this as
+            // a stray log.
+            LOG.info("The topicId does not exist in {}, treat it as a stray 
log.", log);
+            return true;
+        }

Review Comment:
   How about letting the log manager make this decision? In Kafka 4.0 all 
topics must have a topic id.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -340,7 +339,7 @@ class BrokerMetadataPublisher(
       // recovery-from-unclean-shutdown if required.
       logManager.startup(
         metadataCache.getAllTopics().asScala,
-        isStray = log => JLogManager.isStrayKraftReplica(brokerId, 
newImage.topics(), log)
+        isStray = log => TopicsImage.isStrayReplica(newImage.topics(), 
brokerId, log.topicId(), log.topicPartition().partition(), log.toString)

Review Comment:
   Minor but to me stray partition are in the log manager not in the topics 
image. Meaning the log manager has partition entries that are not in the latest 
topics image.
   
   In some sense the log manager understand topics image and makes sure that 
they match. The topics images doesn't know anything about "stray partitions" 
and the log manager. 
   
   If you still want to move the functionality TopicsImage maybe make it a 
method (not static) with `Stream<TopicIdPartition> 
deletedPartitionsForReplica(int brokerId, Stream<TopicIdPartition>)`.



##########
checkstyle/import-control.xml:
##########
@@ -486,6 +486,7 @@
     <allow class="org.apache.kafka.common.compress.Compression" 
exact-match="true" />
     <allow pkg="org.apache.kafka.common.config" />
     <allow pkg="org.apache.kafka.common.feature" />
+    <allow pkg="org.apache.kafka.common.internals" />

Review Comment:
   Note that to me this means that the common module is not organized correctly 
if the raft module needs types in the "internals" namespace. Same comment 
applies to the change below which also includes `o.a.k.s.internals.log`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to