msn-tldr commented on code in PR #14564:
URL: https://github.com/apache/kafka/pull/14564#discussion_r1370368421


##########
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##########
@@ -150,7 +150,7 @@ MetadataCache mergeWith(String newClusterId,
         // We want the most recent topic ID. We start with the previous ID 
stored for retained topics and then
         // update with newest information from the MetadataResponse. We always 
take the latest state, removing existing
         // topic IDs if the latest state contains the topic name but not a 
topic ID.
-        Map<String, Uuid> newTopicIds = topicIds.entrySet().stream()
+        Map<String, Uuid> newTopicIds = this.topicIds.entrySet().stream()

Review Comment:
   this was introduced here
   
https://github.com/apache/kafka/pull/11004/files#diff-5e4d9713b6e5a386e5cac7e81215160948859243a71b439c3a990876e56b3ec5
   
   Net effect of the bug was that in the merged cache, the IDs of retained 
topics(from pre-existing metadata) would be lost in the newly built cache(via 
merging).
   As the comment explains the intention of the code is to get merged list of 
topic-ids. This is done by initialising the merged list of topic-ids with 
retained topic(`this.topicIds`). And then updating with newest 
information(`this.topicIds`). But the code used to use `topicIds` even to get 
retained topic-ids. 
   
   > We start with the previous ID stored for retained topics and then update 
with newest information from the MetadataResponse.
   
   



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);

Review Comment:
   You are right, i see everywhere else is at `DEBUG` level so i have changed 
the logging level.



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+                continue;
+            }
+            if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+                log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+                continue;
+            }
+            if (!newNodes.containsKey(newLeader.leaderId.get())) {
+                log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+                continue;
+            }
+            if (!this.cache.partitionMetadata(partition).isPresent()) {
+                log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+                continue;
+            }

Review Comment:
   > if it makes sense to throw these into a separate method 
shouldUpdatePartitionLeader(. . .)? 
   
   Right now conditions to not update are fairly contained, so i will skip 
adding the method now.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -171,6 +179,15 @@ protected void handleFetchResponse(final Node fetchTarget,
                 log.debug("Fetch {} at offset {} for partition {} returned 
fetch data {}",
                         fetchConfig.isolationLevel, fetchOffset, partition, 
partitionData);
 
+                Errors partitionError = 
Errors.forCode(partitionData.errorCode());
+                if (requestVersion >= 16 && (partitionError == 
Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH)) 
{

Review Comment:
   It wasn't needed. As new leader is coming through tagged fields, for version 
< 16, tagged fields would be initialised with default values that would be 
ignored.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -685,6 +705,13 @@ private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionRespons
                             "to request metadata update now", 
batch.topicPartition,
                             error.exception(response.errorMessage).toString());
                 }
+                if (error == Errors.NOT_LEADER_OR_FOLLOWER || error == 
Errors.FENCED_LEADER_EPOCH) {

Review Comment:
   Either is OK, just different styles. But get ur point, sticking to 1 style 
makes sense, so going with current one.



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+     * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, 
List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        Set<TopicPartition> updatedPartitions = new HashSet<>();
+        for (Entry partitionLeader: partitionLeaders.entrySet()) {
+            TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
+                continue;
+            }
+            if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= 
currentLeader.epoch.get()) {
+                log.trace("For {}, incoming leader({}) is not-newer than the 
one in the existing metadata {}, so ignoring.", partition, newLeader, 
currentLeader);
+                continue;
+            }
+            if (!newNodes.containsKey(newLeader.leaderId.get())) {
+                log.trace("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
+                continue;
+            }
+            if (!this.cache.partitionMetadata(partition).isPresent()) {
+                log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
+                continue;
+            }
+
+            MetadataResponse.PartitionMetadata existingMetadata = 
this.cache.partitionMetadata(partition).get();

Review Comment:
   Good call-out, but no, due to check at line 392 making sure 
`optional.isPresent()`
   ```
   
   if (!this.cache.partitionMetadata(partition).isPresent()) {
                   log.trace("For {}, incoming leader({}), no longer has cached 
metadata so ignoring.", partition, newLeader);
                   continue;
    }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to