hachikuji commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r749668458



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +379,49 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateOnChangedTopicIds() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic with no topic ID
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // We should treat an added topic ID as though it is the same topic. 
Handle only when epoch increases.
+        // Don't update to an older one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, 
topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Don't cause update if it's the same one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, 
topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Update if we see newer epoch
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12, 
topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 4L);
+        assertEquals(Optional.of(12), metadata.lastSeenLeaderEpoch(tp));
+
+        // We should also update if we see a new topicId even if the epoch is 
lower
+        Map<String, Uuid> newTopicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 3, 
newTopicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 5L);
+        assertEquals(Optional.of(3), metadata.lastSeenLeaderEpoch(tp));
+
+        // Finally, update when the topic ID is new and the epoch is higher
+        Map<String, Uuid> newTopicIds2 = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 20, 
newTopicIds2);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 6L);
+        assertEquals(Optional.of(20), metadata.lastSeenLeaderEpoch(tp));
+

Review comment:
       nit: unneeded newline

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -375,17 +382,25 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 
     /**
      * Compute the latest partition metadata to cache given ordering by leader 
epochs (if both
-     * available and reliable).
+     * available and reliable) and whether the topic ID changed.
      */
     private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
             MetadataResponse.PartitionMetadata partitionMetadata,
-            boolean hasReliableLeaderEpoch) {
+            boolean hasReliableLeaderEpoch,
+            Uuid topicId,
+            Uuid oldTopicId) {
         TopicPartition tp = partitionMetadata.topicPartition;
         if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
-            // If the received leader epoch is at least the same as the 
previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch) {
+            if (topicId != null && oldTopicId != null && 
!topicId.equals(oldTopicId)) {
+                // If both topic IDs were valid and the topic ID changed, 
update the metadata
+                log.debug("Topic ID for partition {} changed from {} to {}, so 
this topic must have been recreated. " +

Review comment:
       Can we change this level to INFO?

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -77,6 +79,11 @@ private static MetadataResponse emptyMetadataResponse() {
                 Collections.emptyList());
     }
 
+    private <T> void assertEqualCollections(Collection<T> expected, 
Collection<T> actual) {

Review comment:
       nit: this name seems misleading since order could be important for 
arbitrary collections. Since we only have a couple uses, maybe we can get rid 
of it and use `assertEquals(new HashSet(A), new HashSet(B))` for example.

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -158,13 +148,14 @@ MetadataCache mergeWith(String newClusterId,
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new 
HashMap<>(addPartitions.size());
         Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
 
-        // We want the most recent topic ID. We add the old one here for 
retained topics and then update with newest information in the MetadataResponse
-        // we add if a new topic ID is added or remove if the request did not 
support topic IDs for this topic.
-        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
-            if (shouldRetainTopic.test(entry.getKey())) {
-                newTopicIds.put(entry.getKey(), entry.getValue());
+        // We want the most recent topic ID. We start with the previous ID 
stored for retained topics and then
+        // update with newest information from the MetadataResponse. We always 
take the latest state, removing existing
+        // topic IDs if the latest state contains the topic name but not a 
topic ID.
+        this.topicIds.forEach((topicName, topicId) -> {

Review comment:
       nit: simpler or not?
   ```java
           Map<String, Uuid> newTopicIds = topicIds.entrySet().stream()
               .filter(entry -> shouldRetainTopic.test(entry.getKey()))
               .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1206,7 +1205,8 @@ private void validatePositionsOnMetadataChange() {
                     fetchable.put(node, builder);
                 }
 
-                builder.add(partition, 
topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), new 
FetchRequest.PartitionData(position.offset,
+                Uuid topicId = metadata.topicId(partition.topic());

Review comment:
       It could be updated in a separate thread. I cannot see how that would be 
a problem though. We do have synchronization in `Metadata`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to