msn-tldr commented on code in PR #14564: URL: https://github.com/apache/kafka/pull/14564#discussion_r1370368421
########## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ########## @@ -150,7 +150,7 @@ MetadataCache mergeWith(String newClusterId, // We want the most recent topic ID. We start with the previous ID stored for retained topics and then // update with newest information from the MetadataResponse. We always take the latest state, removing existing // topic IDs if the latest state contains the topic name but not a topic ID. - Map<String, Uuid> newTopicIds = topicIds.entrySet().stream() + Map<String, Uuid> newTopicIds = this.topicIds.entrySet().stream() Review Comment: this was introduced here https://github.com/apache/kafka/pull/11004/files#diff-5e4d9713b6e5a386e5cac7e81215160948859243a71b439c3a990876e56b3ec5 Net effect of the bug was that in the merged cache, the IDs of retained topics(from pre-existing metadata) would be lost in the newly built cache(via merging). As the comment explains the intention of the code is to get merged list of topic-ids. This is done by initialising the merged list of topic-ids with retained topic(`this.topicIds`). And then updating with newest information(`this.topicIds`). But the code used to use `topicIds` even to get retained topic-ids. > We start with the previous ID stored for retained topics and then update with newest information from the MetadataResponse. ########## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ########## @@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, MetadataResponse response, b log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache); } + /** + * Update the metadata by merging existing metadata with the input leader information and nodes. This is called whenever + * partial updates to metadata are returned in a response from broker(ex - ProduceResponse & FetchResponse). + * Note that the updates via Metadata RPC are handled separately in ({@link #update}). + * Both partitionLeader and leaderNodes override the existing metadata. Non-overlapping metadata is kept as it is. + * @param partitionLeaders map of new leadership information for partitions. + * @param leaderNodes a list of nodes for leaders in the above map. + * @return a set of partitions, for which leaders were updated. + */ + public synchronized Set<TopicPartition> updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, List<Node> leaderNodes) { + Map<Integer, Node> newNodes = leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node)); + // Insert non-overlapping nodes from existing-nodes into new-nodes. + this.cache.cluster().nodes().stream().forEach(node -> newNodes.putIfAbsent(node.id(), node)); + + // Create partition-metadata for all updated partitions. Exclude updates for partitions - + // 1. for which the corresponding partition has newer leader in existing metadata. + // 2. for which corresponding leader's node is missing in the new-nodes. + // 3. for which the existing metadata doesn't know about the partition. + List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>(); + Set<TopicPartition> updatedPartitions = new HashSet<>(); + for (Entry partitionLeader: partitionLeaders.entrySet()) { + TopicPartition partition = (TopicPartition) partitionLeader.getKey(); + Metadata.LeaderAndEpoch currentLeader = currentLeader(partition); + Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) partitionLeader.getValue(); + if (!newLeader.epoch.isPresent() || !newLeader.leaderId.isPresent()) { + log.trace("For {}, incoming leader information is incomplete {}", partition, newLeader); Review Comment: You are right, i see everywhere else is at `DEBUG` level so i have changed the logging level. ########## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ########## @@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, MetadataResponse response, b log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache); } + /** + * Update the metadata by merging existing metadata with the input leader information and nodes. This is called whenever + * partial updates to metadata are returned in a response from broker(ex - ProduceResponse & FetchResponse). + * Note that the updates via Metadata RPC are handled separately in ({@link #update}). + * Both partitionLeader and leaderNodes override the existing metadata. Non-overlapping metadata is kept as it is. + * @param partitionLeaders map of new leadership information for partitions. + * @param leaderNodes a list of nodes for leaders in the above map. + * @return a set of partitions, for which leaders were updated. + */ + public synchronized Set<TopicPartition> updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, List<Node> leaderNodes) { + Map<Integer, Node> newNodes = leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node)); + // Insert non-overlapping nodes from existing-nodes into new-nodes. + this.cache.cluster().nodes().stream().forEach(node -> newNodes.putIfAbsent(node.id(), node)); + + // Create partition-metadata for all updated partitions. Exclude updates for partitions - + // 1. for which the corresponding partition has newer leader in existing metadata. + // 2. for which corresponding leader's node is missing in the new-nodes. + // 3. for which the existing metadata doesn't know about the partition. + List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>(); + Set<TopicPartition> updatedPartitions = new HashSet<>(); + for (Entry partitionLeader: partitionLeaders.entrySet()) { + TopicPartition partition = (TopicPartition) partitionLeader.getKey(); + Metadata.LeaderAndEpoch currentLeader = currentLeader(partition); + Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) partitionLeader.getValue(); + if (!newLeader.epoch.isPresent() || !newLeader.leaderId.isPresent()) { + log.trace("For {}, incoming leader information is incomplete {}", partition, newLeader); + continue; + } + if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= currentLeader.epoch.get()) { + log.trace("For {}, incoming leader({}) is not-newer than the one in the existing metadata {}, so ignoring.", partition, newLeader, currentLeader); + continue; + } + if (!newNodes.containsKey(newLeader.leaderId.get())) { + log.trace("For {}, incoming leader({}), the corresponding node information for node-id {} is missing, so ignoring.", partition, newLeader, newLeader.leaderId.get()); + continue; + } + if (!this.cache.partitionMetadata(partition).isPresent()) { + log.trace("For {}, incoming leader({}), no longer has cached metadata so ignoring.", partition, newLeader); + continue; + } Review Comment: > if it makes sense to throw these into a separate method shouldUpdatePartitionLeader(. . .)? Right now conditions to not update are fairly contained, so i will skip adding the method now. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -171,6 +179,15 @@ protected void handleFetchResponse(final Node fetchTarget, log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", fetchConfig.isolationLevel, fetchOffset, partition, partitionData); + Errors partitionError = Errors.forCode(partitionData.errorCode()); + if (requestVersion >= 16 && (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH)) { Review Comment: It wasn't needed. As new leader is coming through tagged fields, for version < 16, tagged fields would be initialised with default values that would be ignored. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -685,6 +705,13 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons "to request metadata update now", batch.topicPartition, error.exception(response.errorMessage).toString()); } + if (error == Errors.NOT_LEADER_OR_FOLLOWER || error == Errors.FENCED_LEADER_EPOCH) { Review Comment: Either is OK, just different styles. But get ur point, sticking to 1 style makes sense, so going with current one. ########## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ########## @@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, MetadataResponse response, b log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache); } + /** + * Update the metadata by merging existing metadata with the input leader information and nodes. This is called whenever + * partial updates to metadata are returned in a response from broker(ex - ProduceResponse & FetchResponse). + * Note that the updates via Metadata RPC are handled separately in ({@link #update}). + * Both partitionLeader and leaderNodes override the existing metadata. Non-overlapping metadata is kept as it is. + * @param partitionLeaders map of new leadership information for partitions. + * @param leaderNodes a list of nodes for leaders in the above map. + * @return a set of partitions, for which leaders were updated. + */ + public synchronized Set<TopicPartition> updatePartially(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, List<Node> leaderNodes) { + Map<Integer, Node> newNodes = leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node)); + // Insert non-overlapping nodes from existing-nodes into new-nodes. + this.cache.cluster().nodes().stream().forEach(node -> newNodes.putIfAbsent(node.id(), node)); + + // Create partition-metadata for all updated partitions. Exclude updates for partitions - + // 1. for which the corresponding partition has newer leader in existing metadata. + // 2. for which corresponding leader's node is missing in the new-nodes. + // 3. for which the existing metadata doesn't know about the partition. + List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>(); + Set<TopicPartition> updatedPartitions = new HashSet<>(); + for (Entry partitionLeader: partitionLeaders.entrySet()) { + TopicPartition partition = (TopicPartition) partitionLeader.getKey(); + Metadata.LeaderAndEpoch currentLeader = currentLeader(partition); + Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) partitionLeader.getValue(); + if (!newLeader.epoch.isPresent() || !newLeader.leaderId.isPresent()) { + log.trace("For {}, incoming leader information is incomplete {}", partition, newLeader); + continue; + } + if (currentLeader.epoch.isPresent() && newLeader.epoch.get() <= currentLeader.epoch.get()) { + log.trace("For {}, incoming leader({}) is not-newer than the one in the existing metadata {}, so ignoring.", partition, newLeader, currentLeader); + continue; + } + if (!newNodes.containsKey(newLeader.leaderId.get())) { + log.trace("For {}, incoming leader({}), the corresponding node information for node-id {} is missing, so ignoring.", partition, newLeader, newLeader.leaderId.get()); + continue; + } + if (!this.cache.partitionMetadata(partition).isPresent()) { + log.trace("For {}, incoming leader({}), no longer has cached metadata so ignoring.", partition, newLeader); + continue; + } + + MetadataResponse.PartitionMetadata existingMetadata = this.cache.partitionMetadata(partition).get(); Review Comment: Good call-out, but no, due to check at line 392 making sure `optional.isPresent()` ``` if (!this.cache.partitionMetadata(partition).isPresent()) { log.trace("For {}, incoming leader({}), no longer has cached metadata so ignoring.", partition, newLeader); continue; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org