hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r668324802



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If the topic ID changed, updated the metadata
+            } else if (changedTopicId) {
+                log.debug("Topic ID changed, so this topic must have been 
recreated. " +
+                        "Removing last seen epoch {} for the old partition {} 
and adding epoch {} from new metadata", currentEpoch, tp, newEpoch);
+                lastSeenLeaderEpochs.put(tp, newEpoch);
+                return Optional.of(partitionMetadata);

Review comment:
       Leaving this comment here for lack of an alternative location. This 
patch takes a good first step in improving consumer behavior for the topic 
recreation case. At least we are able to detect and discard the old epoch 
state. In fact, it does a little more than that since, combined with the fetch 
validation logic, we are likely to detect that the old fetch position is no 
longer valid. Most likely this case would get raised to the user as a 
`LogTruncationException`, which might not be ideal, but at least is 
justifiable. However, it doesn't quite close the door on reuse of the fetch 
position since it may remain valid on the recreated topic. For the full 
solution, we probably need to track topicId in SubscriptionState as well so 
that we can force an offset reset whenever the topicId changes. I think it 
makes sense to do this in https://issues.apache.org/jira/browse/KAFKA-12975. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -391,10 +393,15 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
             int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the 
previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch || 
changedTopicId) {
+            if (currentEpoch == null || newEpoch >= currentEpoch) {
                 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            } else if (changedTopicId) {
+                log.debug("Topic ID changed, so this topic must have been 
recreated. " +

Review comment:
       nit: seems like it would be more useful for the log message to indicate 
the topic ids that changed instead of the unrelated epochs.

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for 
retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not 
support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
+            if (shouldRetainTopic.test(entry.getKey())) {
+                newTopicIds.put(entry.getKey(), entry.getValue());
+            }
+        }
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID

Review comment:
       Does this still make sense in the context of 3.0, which does not have 
topicId fetch logic?

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -93,6 +104,13 @@ private MetadataCache(String clusterId,
         return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
+    Uuid topicId(String topicName) {
+        return topicIds.get(topicName);
+    }
+    String topicName(Uuid topicId) {

Review comment:
       nit: add newline between methods




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to