hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670046323



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update 
the metadata

Review comment:
       nit: move this comment into the `if`

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update 
the metadata
+            } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null 
&& !topicId.equals(oldTopicId)) {

Review comment:
       Hmm, shouldn't this check come before the epoch check? Admittedly, it's 
unlikely that a recreated topic would have a higher epoch, but we may as well 
handle that case.
   
   By the way, it's a little inconsistent that this check uses both null and 
`Uuid.ZERO_UUID` to represent a missing value. Maybe we can use null 
consistently?

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -216,6 +217,14 @@ public synchronized boolean updateRequested() {
         }
     }
 
+    public synchronized Uuid topicId(String topicName) {

Review comment:
       Can you document that this returns null if the topicId does not exist or 
is not known? Similarly for `topicName`.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata 
should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, 
newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = 
RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, 
newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> 
assertEquals(metadata.topicId(topicName), null));

Review comment:
       nit: `assertNull`

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata 
should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, 
newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = 
RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, 
newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> 
assertEquals(metadata.topicId(topicName), null));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
         assertEquals(cluster.nodes().size(), newNodes);
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
+    }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new 
AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new 
LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, 
long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be 
retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(new HashSet<>(Arrays.asList(

Review comment:
       nit: you can use `Utils.mkSet` (a few more of these)

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update 
the metadata
+            } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null 
&& !topicId.equals(oldTopicId)) {
+                log.debug("Topic ID for partition {} changed from {} to {}, so 
this topic must have been recreated. " +
+                                "Using the newly updated metadata.", tp, 
oldTopicId, topicId);

Review comment:
       Instead of "Using the newly updated metadata," maybe we can say this:
   > Resetting the last seen epoch to {}.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -371,6 +372,43 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateOnChangedTopicIds() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic with no topic ID
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 10));
+
+        // We should treat an added topic ID as though it is the same topic. 
Handle only when epoch increases.
+        // Don't update to an older one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, 
topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 10));

Review comment:
       nit: this seems more concise
   ```java
           assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
   ```

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -371,6 +372,43 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateOnChangedTopicIds() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic with no topic ID
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 10));
+
+        // We should treat an added topic ID as though it is the same topic. 
Handle only when epoch increases.
+        // Don't update to an older one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, 
topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 10));
+
+        // Don't cause update if it's the same one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, 
topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 10));
+
+        // Update if we see newer epoch
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12, 
topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 4L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
+
+        // We should also update if we see a new topicId even if the epoch is 
lower

Review comment:
       We may as well cover the case when the topicId is changed _and_ the 
epoch is higher.

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,36 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We start with the previous ID 
stored for retained topics and then
+        // update with newest information in the MetadataResponse.

Review comment:
       nit: "update with the newest information from the MetadataResponse."

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -863,8 +906,12 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
             "keepValidTopic",
             "newValidTopic")));
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, 
newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 200);
+        topicIds.put("newValidTopic", Uuid.randomUuid());
+        metadataResponse = 
RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, 
newTopicPartitionCounts, _tp -> 200, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        topicIds.remove("oldValidTopic");
+        retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+        assertEquals(metadata.topicId("oldValidTopic"), null);

Review comment:
       nit: use `assertNull`

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -316,20 +325,29 @@ private MetadataCache 
handleMetadataResponse(MetadataResponse metadataResponse,
         Set<String> invalidTopics = new HashSet<>();
 
         List<MetadataResponse.PartitionMetadata> partitions = new 
ArrayList<>();
+        Map<String, Uuid> topicIds = new HashMap<>();
         for (MetadataResponse.TopicMetadata metadata : 
metadataResponse.topicMetadata()) {
-            topics.add(metadata.topic());
+            String topicName = metadata.topic();
+            Uuid topicId = metadata.topicId();
+            topics.add(topicName);
+            // We only update if the current metadata since we can only 
compare when both topics have valid IDs
+            Uuid oldTopicId = null;
+            if (!topicId.equals(Uuid.ZERO_UUID)) {

Review comment:
       nit: I think we're guaranteed that `topicId` is not null (in spite of 
the inconsistent `equals`), but it's still a little nicer to write this check 
as `!Uuid.ZERO_UUID.equals(topicId)`

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -69,13 +73,20 @@ private MetadataCache(String clusterId,
                           Set<String> invalidTopics,
                           Set<String> internalTopics,
                           Node controller,
+                          Map<String, Uuid> topicIds,
                           Cluster clusterInstance) {
         this.clusterId = clusterId;
         this.nodes = nodes;
         this.unauthorizedTopics = unauthorizedTopics;
         this.invalidTopics = invalidTopics;
         this.internalTopics = internalTopics;
         this.controller = controller;
+        this.topicIds = topicIds;
+
+        this.topicNames = new HashMap<>(topicIds.size());

Review comment:
       As far as I can tell, there are no uses of this collection 
(`Metadata.topicName` is not referenced). Can we get rid of it?

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -316,20 +325,29 @@ private MetadataCache 
handleMetadataResponse(MetadataResponse metadataResponse,
         Set<String> invalidTopics = new HashSet<>();
 
         List<MetadataResponse.PartitionMetadata> partitions = new 
ArrayList<>();
+        Map<String, Uuid> topicIds = new HashMap<>();
         for (MetadataResponse.TopicMetadata metadata : 
metadataResponse.topicMetadata()) {
-            topics.add(metadata.topic());
+            String topicName = metadata.topic();
+            Uuid topicId = metadata.topicId();
+            topics.add(topicName);
+            // We only update if the current metadata since we can only 
compare when both topics have valid IDs

Review comment:
       nit: rephrase?

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -845,6 +887,7 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("oldValidTopic", "keepValidTopic")));
         assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));

Review comment:
       Do we want to make this assertion stronger? Or is `topicIds` a subset?

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,36 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We start with the previous ID 
stored for retained topics and then
+        // update with newest information in the MetadataResponse.
+        // If the newest MetadataResponse:
+        //    - contains a new topic with no ID, add no IDs to newTopicIds

Review comment:
       nit: some of these cases do not seem worth mentioning. I think we're 
just saying that we always take the latest state, removing existing topicIds if 
necessary.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata 
should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, 
newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = 
RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, 
newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> 
assertEquals(metadata.topicId(topicName), null));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
         assertEquals(cluster.nodes().size(), newNodes);
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
+    }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new 
AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new 
LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, 
long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be 
retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(new HashSet<>(Arrays.asList(
+                "validTopic1",
+                "validTopic2")));
+
+        topicIds.put("validTopic1", Uuid.randomUuid());
+        topicIds.put("validTopic2", Uuid.randomUuid());
+        MetadataResponse metadataResponse =
+                RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, 
Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving 
a request from a controller with an older IBP)
+        topicIds.remove("validTopic1");
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, 
nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        Cluster cluster = metadata.fetch();
+        // We still have the topic, but it just doesn't have an ID.
+        assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("validTopic1", "validTopic2")));

Review comment:
       nit: the first argument should be the expected one

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata 
should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, 
newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = 
RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, 
newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> 
assertEquals(metadata.topicId(topicName), null));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
         assertEquals(cluster.nodes().size(), newNodes);
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
+    }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new 
AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new 
LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, 
long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be 
retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(new HashSet<>(Arrays.asList(
+                "validTopic1",
+                "validTopic2")));
+
+        topicIds.put("validTopic1", Uuid.randomUuid());
+        topicIds.put("validTopic2", Uuid.randomUuid());
+        MetadataResponse metadataResponse =
+                RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, 
Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving 
a request from a controller with an older IBP)
+        topicIds.remove("validTopic1");
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, 
nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        Cluster cluster = metadata.fetch();
+        // We still have the topic, but it just doesn't have an ID.
+        assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("validTopic1", "validTopic2")));
+        assertEquals(cluster.partitionsForTopic("validTopic1").size(), 2);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));

Review comment:
       nit: I think we can make this assertion stronger

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata 
should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, 
newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = 
RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, 
newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> 
assertEquals(metadata.topicId(topicName), null));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
         assertEquals(cluster.nodes().size(), newNodes);
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
+    }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new 
AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new 
LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, 
long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be 
retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(new HashSet<>(Arrays.asList(
+                "validTopic1",
+                "validTopic2")));
+
+        topicIds.put("validTopic1", Uuid.randomUuid());
+        topicIds.put("validTopic2", Uuid.randomUuid());
+        MetadataResponse metadataResponse =
+                RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, 
Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving 
a request from a controller with an older IBP)
+        topicIds.remove("validTopic1");
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, 
nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+        retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        Cluster cluster = metadata.fetch();
+        // We still have the topic, but it just doesn't have an ID.
+        assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("validTopic1", "validTopic2")));
+        assertEquals(cluster.partitionsForTopic("validTopic1").size(), 2);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
+        assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1"));
+
     }
+

Review comment:
       nit: extra newline (and above before the end brace)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to