kirktrue commented on code in PR #14564:
URL: https://github.com/apache/kafka/pull/14564#discussion_r1364448430


##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();

Review Comment:
   Ditto on the use of unnecessary casting.



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);

Review Comment:
   Off topic, but I'm not sure I see the utility in using `TRACE`-level 
debugging. I just don't think it'll ever be enabled in production.
   
   Please correct me if there's a use case where it could be helpful.



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+                continue;
+            }
+            if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+                log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+                continue;
+            }
+            if (!newNodes.containsKey(newLeader.leaderId.get())) {
+                log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+                continue;
+            }
+            if (!this.cache.partitionMetadata(partition).isPresent()) {
+                log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+                continue;
+            }
+
+            MetadataResponse.PartitionMetadata existingMetadata = 
this.cache.partitionMetadata(partition).get();
+            MetadataResponse.PartitionMetadata updatedMetadata = new 
MetadataResponse.PartitionMetadata(
+                existingMetadata.error,
+                partition,
+                newLeader.leaderId,
+                newLeader.epoch,
+                existingMetadata.replicaIds,
+                existingMetadata.inSyncReplicaIds,
+                existingMetadata.offlineReplicaIds
+            );
+            updatePartitionMetadata.add(updatedMetadata);
+
+            lastSeenLeaderEpochs.put(partition, newLeader.epoch.get());
+
+            updatedPartitions.add(partition);
+        }
+
+        if (updatedPartitions.isEmpty()) {
+            log.debug("No relevant metadata updates.");
+            return updatedPartitions;
+        }
+
+        // Get topic-ids for filtered partitions from existing topic-ids.
+        Map<String, Uuid> existingTopicIds = this.cache.topicIds();
+        Map<String, Uuid> filteredTopicIds = updatePartitionMetadata.stream()
+            .filter(e -> existingTopicIds.containsKey(e.topic()))
+            .collect(Collectors.toMap(e -> e.topic(), e -> 
existingTopicIds.get(e.topic())));
+
+        Set<String> updatedTopics = 
updatePartitionMetadata.stream().map(MetadataResponse.PartitionMetadata::topic).collect(Collectors.toSet());
+
+        if (log.isTraceEnabled()) {
+            updatePartitionMetadata.forEach(
+                partMetadata -> log.trace("For {} updating to leader-id {}, 
leader-epoch {}.", partMetadata, partMetadata.leaderId.get(), 
partMetadata.leaderEpoch.get())
+            );
+        }
+
+        this.cache = cache.mergeWith(cache.clusterResource().clusterId(), 
newNodes,
+                    updatePartitionMetadata, Collections.emptySet(), 
Collections.emptySet(),
+                    Collections.emptySet(), cache.cluster().controller(), 
filteredTopicIds,
+                    (topic, isInternal) -> !updatedTopics.contains(topic));
+        clusterResourceListeners.onUpdate(cache.clusterResource());
+
+        return updatedPartitions;

Review Comment:
   Can we just pull the set of partitions out like so:
   
   ```suggestion
           return updatePartitionMetadata.stream()
               .map(MetadataResponse.PartitionMetadata::topicPartition)
               .collect(Collectors.toSet());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+                continue;
+            }
+            if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+                log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+                continue;
+            }
+            if (!newNodes.containsKey(newLeader.leaderId.get())) {
+                log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+                continue;
+            }
+            if (!this.cache.partitionMetadata(partition).isPresent()) {
+                log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+                continue;
+            }
+
+            MetadataResponse.PartitionMetadata existingMetadata = 
this.cache.partitionMetadata(partition).get();
+            MetadataResponse.PartitionMetadata updatedMetadata = new 
MetadataResponse.PartitionMetadata(
+                existingMetadata.error,
+                partition,
+                newLeader.leaderId,
+                newLeader.epoch,
+                existingMetadata.replicaIds,
+                existingMetadata.inSyncReplicaIds,
+                existingMetadata.offlineReplicaIds
+            );
+            updatePartitionMetadata.add(updatedMetadata);
+
+            lastSeenLeaderEpochs.put(partition, newLeader.epoch.get());
+
+            updatedPartitions.add(partition);
+        }
+
+        if (updatedPartitions.isEmpty()) {
+            log.debug("No relevant metadata updates.");
+            return updatedPartitions;
+        }
+
+        // Get topic-ids for filtered partitions from existing topic-ids.
+        Map<String, Uuid> existingTopicIds = this.cache.topicIds();
+        Map<String, Uuid> filteredTopicIds = updatePartitionMetadata.stream()
+            .filter(e -> existingTopicIds.containsKey(e.topic()))
+            .collect(Collectors.toMap(e -> e.topic(), e -> 
existingTopicIds.get(e.topic())));
+
+        Set<String> updatedTopics = 
updatePartitionMetadata.stream().map(MetadataResponse.PartitionMetadata::topic).collect(Collectors.toSet());
+
+        if (log.isTraceEnabled()) {
+            updatePartitionMetadata.forEach(
+                partMetadata -> log.trace("For {} updating to leader-id {}, 
leader-epoch {}.", partMetadata, partMetadata.leaderId.get(), 
partMetadata.leaderEpoch.get())
+            );
+        }
+
+        this.cache = cache.mergeWith(cache.clusterResource().clusterId(), 
newNodes,
+                    updatePartitionMetadata, Collections.emptySet(), 
Collections.emptySet(),
+                    Collections.emptySet(), cache.cluster().controller(), 
filteredTopicIds,
+                    (topic, isInternal) -> !updatedTopics.contains(topic));

Review Comment:
   This method call is very important, but it is quite hard to read. Can we 
prettify it a bit? Perhaps...
   
   ```suggestion
           this.cache = cache.mergeWith(
                       cache.clusterResource().clusterId(),
                       newNodes,
                       updatePartitionMetadata,
                       Collections.emptySet(),
                       Collections.emptySet(),
                       Collections.emptySet(),
                       cache.cluster().controller(),
                       filteredTopicIds,
                       (topic, isInternal) -> !updatedTopics.contains(topic)
           );
   ```



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+                continue;
+            }
+            if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+                log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+                continue;
+            }
+            if (!newNodes.containsKey(newLeader.leaderId.get())) {
+                log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+                continue;
+            }
+            if (!this.cache.partitionMetadata(partition).isPresent()) {
+                log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+                continue;
+            }

Review Comment:
   It's a shame that use of `Optional` is so noisy, what with its 
`isPresent()`s and `get()`s everywhere 😦 



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+                continue;
+            }
+            if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+                log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+                continue;
+            }
+            if (!newNodes.containsKey(newLeader.leaderId.get())) {
+                log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+                continue;
+            }
+            if (!this.cache.partitionMetadata(partition).isPresent()) {
+                log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+                continue;
+            }

Review Comment:
   It's a shame that use of `Optional` is so noisy, what with its 
`isPresent()`s and `get()`s everywhere 😦 



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+                continue;
+            }
+            if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+                log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+                continue;
+            }
+            if (!newNodes.containsKey(newLeader.leaderId.get())) {
+                log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+                continue;
+            }
+            if (!this.cache.partitionMetadata(partition).isPresent()) {
+                log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+                continue;
+            }
+
+            MetadataResponse.PartitionMetadata existingMetadata = 
this.cache.partitionMetadata(partition).get();
+            MetadataResponse.PartitionMetadata updatedMetadata = new 
MetadataResponse.PartitionMetadata(
+                existingMetadata.error,
+                partition,
+                newLeader.leaderId,
+                newLeader.epoch,
+                existingMetadata.replicaIds,
+                existingMetadata.inSyncReplicaIds,
+                existingMetadata.offlineReplicaIds
+            );
+            updatePartitionMetadata.add(updatedMetadata);
+
+            lastSeenLeaderEpochs.put(partition, newLeader.epoch.get());
+
+            updatedPartitions.add(partition);
+        }
+
+        if (updatedPartitions.isEmpty()) {

Review Comment:
   `updatedPartitions` is updated alongside `updatePartitionMetadata`, but I'm 
wondering if we can use the size of `updatePartitionMetadata` to make this 
short circuit check?



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+                continue;
+            }
+            if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+                log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+                continue;
+            }
+            if (!newNodes.containsKey(newLeader.leaderId.get())) {
+                log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+                continue;
+            }
+            if (!this.cache.partitionMetadata(partition).isPresent()) {
+                log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+                continue;
+            }

Review Comment:
   All of these checks are to handle cases where the incoming leader 
information is not usable, right? I'm wondering—and this is a stylistic 
choice—if it makes sense to throw these into a separate method 
`shouldUpdatePartitionLeader(. . .)`? Might make it more easily accessible for 
a dedicated unit test, too.



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -706,4 +795,22 @@ public String toString() {
                     '}';
         }
     }
+
+    public static class LeaderIdAndEpoch {
+        public final Optional<Integer> leaderId;
+        public final Optional<Integer> epoch;
+
+        public LeaderIdAndEpoch(Optional<Integer> leaderId, Optional<Integer> 
epoch) {
+            this.leaderId = Objects.requireNonNull(leaderId);
+            this.epoch = Objects.requireNonNull(epoch);
+        }
+
+        @Override
+        public String toString() {
+            return "LeaderIdAndEpoch{" +
+                "leaderId=" + leaderId.map(Number::toString).orElse("absent") +
+                ", epoch=" + epoch.map(Number::toString).orElse("absent") +

Review Comment:
   Interesting. I've not seen that usage of `Optional.map()` before. Cool!



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+                continue;
+            }
+            if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+                log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+                continue;
+            }
+            if (!newNodes.containsKey(newLeader.leaderId.get())) {
+                log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+                continue;
+            }
+            if (!this.cache.partitionMetadata(partition).isPresent()) {
+                log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+                continue;
+            }
+
+            MetadataResponse.PartitionMetadata existingMetadata = 
this.cache.partitionMetadata(partition).get();

Review Comment:
   Is there any way that `existingMetadata` could be `null` at this point? I 
assume not, due to the use of the `Optional` wrapper here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -183,6 +200,24 @@ protected void handleFetchResponse(final Node fetchTarget,
                 fetchBuffer.add(completedFetch);
             }
 
+            if (requestVersion >= 16 && 
!partitionsWithUpdatedLeaderInfo.isEmpty()) {
+                List<Node> leaderNodes = 
response.data().nodeEndpoints().stream()
+                    .map(e -> new Node(e.nodeId(), e.host(), e.port(), 
e.rack()))
+                    .filter(e -> !e.equals(Node.noNode()))
+                    .collect(
+                    Collectors.toList());

Review Comment:
   Minor nit:
   
   ```suggestion
                   List<Node> leaderNodes = 
response.data().nodeEndpoints().stream()
                       .map(e -> new Node(e.nodeId(), e.host(), e.port(), 
e.rack()))
                       .filter(e -> !e.equals(Node.noNode()))
                       .collect(Collectors.toList());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -171,6 +179,15 @@ protected void handleFetchResponse(final Node fetchTarget,
                 log.debug("Fetch {} at offset {} for partition {} returned 
fetch data {}",
                         fetchConfig.isolationLevel, fetchOffset, partition, 
partitionData);
 
+                Errors partitionError = 
Errors.forCode(partitionData.errorCode());
+                if (requestVersion >= 16 && (partitionError == 
Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH)) 
{

Review Comment:
   Can we add a constant, with some comments, for the magic value of `16`?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -614,13 +618,28 @@ private void handleProduceResponse(ClientResponse 
response, Map<TopicPartition,
                                 .collect(Collectors.toList()),
                             p.errorMessage());
                     ProducerBatch batch = batches.get(tp);
-                    completeBatch(batch, partResp, correlationId, now);
+                    completeBatch(batch, partResp, correlationId, now, 
partitionsWithUpdatedLeaderInfo);
+
+                    final short requestVersion = 
response.requestHeader().apiVersion();
+                    if (requestVersion >= 16 && 
!partitionsWithUpdatedLeaderInfo.isEmpty()) {
+                        List<Node> leaderNodes = 
produceResponse.data().nodeEndpoints().stream()
+                            .map(e -> new Node(e.nodeId(), e.host(), e.port(), 
e.rack()))
+                            .filter(e -> !e.equals(Node.noNode()))
+                            .collect(
+                                Collectors.toList());
+                        Set<TopicPartition> updatedPartitions = 
metadata.updatePartially(partitionsWithUpdatedLeaderInfo, leaderNodes);
+                        if (log.isTraceEnabled()) {
+                            updatedPartitions.forEach(
+                                part -> log.trace("For {} leader was 
updated.", part)
+                            );
+                        }
+                    }

Review Comment:
   Is this worth the effort to generalize and share among `Sender` and 
`AbstractFetch`?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -633,9 +652,10 @@ private void handleProduceResponse(ClientResponse 
response, Map<TopicPartition,
      * @param response The produce response
      * @param correlationId The correlation id for the request
      * @param now The current POSIX timestamp in milliseconds
+     * @param partitionsWithUpdatedLeaderInfo This will be populated with 
partitions that have updated leader info.
      */
     private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response, long correlationId,
-                               long now) {
+                               long now, Map<TopicPartition, 
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo) {

Review Comment:
   Can we wrap this `Map` in an `Optional` or explicitly allow for `null` since 
four out of the five callers aren't interested in the updates to the map?



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+                continue;
+            }
+            if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+                log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+                continue;
+            }
+            if (!newNodes.containsKey(newLeader.leaderId.get())) {
+                log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+                continue;
+            }
+            if (!this.cache.partitionMetadata(partition).isPresent()) {
+                log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+                continue;
+            }

Review Comment:
   All of these checks are to handle cases where the incoming leader 
information is not usable, right? I'm wondering—and this is a stylistic 
choice—if it makes sense to throw these into a separate method 
`shouldUpdatePartitionLeader(. . .)`? Might make it more easily accessible for 
a dedicated unit test, too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to