msn-tldr commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1345590803


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+            this, currentLeaderEpoch, leaderChangedAttempts, 
latestLeaderEpoch, attempts());
+        if (attempts() >= 1) {
+            // If the leader's epoch has changed, this counts as a leader 
change
+            if (currentLeaderEpoch != latestLeaderEpoch) {
+                leaderChangedAttempts = attempts();
+                leaderChanged = true;
+            } else {
+                // Otherwise, it's only a leader change until the first 
attempt is made with this leader

Review Comment:
   > I think I can see how the flow of the code would end up calling that 
multiple times.
   
   I gets called multiple times. Its sendProducerData -> ready -> 
partitionReady, and then again in sendProducerData -> drain -> 
drainBatchesForOneNode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to