wcarlson5 commented on code in PR #14564: URL: https://github.com/apache/kafka/pull/14564#discussion_r1378153127
########## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ########## @@ -350,6 +353,91 @@ public synchronized void update(int requestVersion, MetadataResponse response, b log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache); } + /** + * Updates the partition-leadership info in the metadata. Update is done by merging existing metadata with the input leader information and nodes. + * This is called whenever partition-leadership updates are returned in a response from broker(ex - ProduceResponse & FetchResponse). + * Note that the updates via Metadata RPC are handled separately in ({@link #update}). + * Both partitionLeader and leaderNodes override the existing metadata. Non-overlapping metadata is kept as it is. + * @param partitionLeaders map of new leadership information for partitions. + * @param leaderNodes a list of nodes for leaders in the above map. + * @return a set of partitions, for which leaders were updated. + */ + public synchronized Set<TopicPartition> updatePartitionLeadership(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, List<Node> leaderNodes) { + Map<Integer, Node> newNodes = leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node)); + // Insert non-overlapping nodes from existing-nodes into new-nodes. + this.cache.cluster().nodes().stream().forEach(node -> newNodes.putIfAbsent(node.id(), node)); + + // Create partition-metadata for all updated partitions. Exclude updates for partitions - + // 1. for which the corresponding partition has newer leader in existing metadata. + // 2. for which corresponding leader's node is missing in the new-nodes. + // 3. for which the existing metadata doesn't know about the partition. + List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>(); + for (Entry<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeader: partitionLeaders.entrySet()) { + TopicPartition partition = partitionLeader.getKey(); + Metadata.LeaderAndEpoch currentLeader = currentLeader(partition); + Metadata.LeaderIdAndEpoch newLeader = partitionLeader.getValue(); + if (!newLeader.epoch.isPresent() || !newLeader.leaderId.isPresent()) { + log.debug("For {}, incoming leader information is incomplete {}", partition, newLeader); Review Comment: Should we only log these if debug is enabled? Or should we move them to INFO ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -807,12 +807,18 @@ public boolean hasUndrained() { private boolean shouldBackoff(boolean hasLeaderChanged, final ProducerBatch batch, final long waitedTimeMs) { boolean shouldWaitMore = batch.attempts() > 0 && waitedTimeMs < retryBackoff.backoff(batch.attempts() - 1); boolean shouldBackoff = !hasLeaderChanged && shouldWaitMore; - if (shouldBackoff) { - log.trace( - "For {}, will backoff", batch); - } else { - log.trace( - "For {}, will not backoff, shouldWaitMore {}, hasLeaderChanged {}", batch, shouldWaitMore, hasLeaderChanged); + if (log.isTraceEnabled()) { Review Comment: Love seeing logging improvements! 🙏 ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -807,12 +807,18 @@ public boolean hasUndrained() { private boolean shouldBackoff(boolean hasLeaderChanged, final ProducerBatch batch, final long waitedTimeMs) { boolean shouldWaitMore = batch.attempts() > 0 && waitedTimeMs < retryBackoff.backoff(batch.attempts() - 1); boolean shouldBackoff = !hasLeaderChanged && shouldWaitMore; - if (shouldBackoff) { - log.trace( - "For {}, will backoff", batch); - } else { - log.trace( - "For {}, will not backoff, shouldWaitMore {}, hasLeaderChanged {}", batch, shouldWaitMore, hasLeaderChanged); + if (log.isTraceEnabled()) { Review Comment: Love seeing logging improvements! 🙏 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -198,6 +215,20 @@ protected void handleFetchSuccess(final Node fetchTarget, fetchBuffer.add(completedFetch); } + if (!partitionsWithUpdatedLeaderInfo.isEmpty()) { Review Comment: I'm a bit confused why we collect the `partitionsWithUpdatedLeaderInfo` above when it looks like all we do with them is validate them to the subscriptions later. Is there any other use for having it out of the loop? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org