wcarlson5 commented on code in PR #14564:
URL: https://github.com/apache/kafka/pull/14564#discussion_r1378153127


##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -350,6 +353,91 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
         log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
     }
 
+    /**
+     * Updates the partition-leadership info in the metadata. Update is done 
by merging existing metadata with the input leader information and nodes.
+     * This is called whenever partition-leadership updates are returned in a 
response from broker(ex - ProduceResponse & FetchResponse).
+     * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+     * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+     * @param partitionLeaders map of new leadership information for 
partitions.
+     * @param leaderNodes a list of nodes for leaders in the above map.
+     * @return a set of partitions, for which leaders were updated.
+     */
+    public synchronized Set<TopicPartition> 
updatePartitionLeadership(Map<TopicPartition, LeaderIdAndEpoch> 
partitionLeaders, List<Node> leaderNodes) {
+        Map<Integer, Node> newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+        // Insert non-overlapping nodes from existing-nodes into new-nodes.
+        this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+        // Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+        // 1. for which the corresponding partition has newer leader in 
existing metadata.
+        // 2. for which corresponding leader's node is missing in the 
new-nodes.
+        // 3. for which the existing metadata doesn't know about the partition.
+        List<PartitionMetadata> updatePartitionMetadata = new ArrayList<>();
+        for (Entry<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeader: 
partitionLeaders.entrySet()) {
+            TopicPartition partition = partitionLeader.getKey();
+            Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+            Metadata.LeaderIdAndEpoch newLeader = partitionLeader.getValue();
+            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+                log.debug("For {}, incoming leader information is incomplete 
{}", partition, newLeader);

Review Comment:
   Should we only log these if debug is enabled? Or should we move them to INFO



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -807,12 +807,18 @@ public boolean hasUndrained() {
     private boolean shouldBackoff(boolean hasLeaderChanged, final 
ProducerBatch batch, final long waitedTimeMs) {
         boolean shouldWaitMore = batch.attempts() > 0 && waitedTimeMs < 
retryBackoff.backoff(batch.attempts() - 1);
         boolean shouldBackoff = !hasLeaderChanged && shouldWaitMore;
-        if (shouldBackoff) {
-            log.trace(
-                "For {}, will backoff", batch);
-        } else {
-            log.trace(
-                "For {}, will not backoff, shouldWaitMore {}, hasLeaderChanged 
{}", batch, shouldWaitMore, hasLeaderChanged);
+        if (log.isTraceEnabled()) {

Review Comment:
   Love seeing logging improvements! 🙏 



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -807,12 +807,18 @@ public boolean hasUndrained() {
     private boolean shouldBackoff(boolean hasLeaderChanged, final 
ProducerBatch batch, final long waitedTimeMs) {
         boolean shouldWaitMore = batch.attempts() > 0 && waitedTimeMs < 
retryBackoff.backoff(batch.attempts() - 1);
         boolean shouldBackoff = !hasLeaderChanged && shouldWaitMore;
-        if (shouldBackoff) {
-            log.trace(
-                "For {}, will backoff", batch);
-        } else {
-            log.trace(
-                "For {}, will not backoff, shouldWaitMore {}, hasLeaderChanged 
{}", batch, shouldWaitMore, hasLeaderChanged);
+        if (log.isTraceEnabled()) {

Review Comment:
   Love seeing logging improvements! 🙏 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -198,6 +215,20 @@ protected void handleFetchSuccess(final Node fetchTarget,
                 fetchBuffer.add(completedFetch);
             }
 
+            if (!partitionsWithUpdatedLeaderInfo.isEmpty()) {

Review Comment:
   I'm a bit confused why we collect the `partitionsWithUpdatedLeaderInfo` 
above when it looks like all we do with them is validate them to the 
subscriptions later. Is there any other use for having it out of the loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to