hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r665599081



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -316,8 +325,16 @@ private MetadataCache 
handleMetadataResponse(MetadataResponse metadataResponse,
         Set<String> invalidTopics = new HashSet<>();
 
         List<MetadataResponse.PartitionMetadata> partitions = new 
ArrayList<>();
+        Map<String, Uuid> topicIds = new HashMap<>();
         for (MetadataResponse.TopicMetadata metadata : 
metadataResponse.topicMetadata()) {
             topics.add(metadata.topic());
+            boolean changedTopicId = false;
+            if (!metadata.topicId().equals(Uuid.ZERO_UUID)) {
+                topicIds.put(metadata.topic(), metadata.topicId());

Review comment:
       nit: maybe we can pull out a variable for `metadata.topic()` since there 
are 10 or so uses

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for 
retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not 
support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
+            if (shouldRetainTopic.test(entry.getKey())) {
+                newTopicIds.put(entry.getKey(), entry.getValue());
+            }
+        }
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID

Review comment:
       What is the rationale to discard topicId information? Is this to deal 
with downgrades?

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for 
retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not 
support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {

Review comment:
       nit: map iterations are a little more readable with a `forEach`
   ```java
   this.topicIds.forEach((topicName, topicId) -> {
     ...
   });
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -367,13 +384,14 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
      */
     private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
             MetadataResponse.PartitionMetadata partitionMetadata,
-            boolean hasReliableLeaderEpoch) {
+            boolean hasReliableLeaderEpoch,
+            boolean changedTopicId) {
         TopicPartition tp = partitionMetadata.topicPartition;
         if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the 
previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch) {
+            if (currentEpoch == null || newEpoch >= currentEpoch || 
changedTopicId) {

Review comment:
       The log message below does not make much sense if the topicId has 
changed. Maybe this should be a separate case?

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -93,6 +104,14 @@ private MetadataCache(String clusterId,
         return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
+    Map<String, Uuid> topicIds() {

Review comment:
       Do we need to expose the map or could we just have lookup methods:
   ```java
   Uuid topicId(String topicName);
   String topicName(Uuid topicId);
   ```

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +920,34 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving 
a request from a controller with an older IBP)

Review comment:
       This test case is a bit much to take in. Is it possible to do a separate 
test case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to