kirktrue commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1326170737


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",

Review Comment:
   Can I be extremely nit-picky about the capitalization of `Attempt` in the 
log line? Any reason it can't just be `attempt`?



##########
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##########
@@ -20,26 +20,35 @@
  * This is used to describe per-partition state in the MetadataResponse.
  */
 public class PartitionInfo {
+    public static final int UNKNOWN_LEADER_EPOCH = -1;
     private final String topic;
     private final int partition;
     private final Node leader;
+    private final int leaderEpoch;
     private final Node[] replicas;
     private final Node[] inSyncReplicas;
     private final Node[] offlineReplicas;
 
     public PartitionInfo(String topic, int partition, Node leader, Node[] 
replicas, Node[] inSyncReplicas) {
-        this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]);
+        this(topic, partition, leader, UNKNOWN_LEADER_EPOCH, replicas, 
inSyncReplicas, new Node[0]);
+    }
+
+    public PartitionInfo(String topic, int partition, Node leader, Node[] 
replicas, Node[] inSyncReplicas,
+        Node[] offlineReplicas) {
+        this(topic, partition, leader, UNKNOWN_LEADER_EPOCH, replicas, 
inSyncReplicas, offlineReplicas);
     }
 
     public PartitionInfo(String topic,
                          int partition,
                          Node leader,
+                         int leaderEpoch,

Review Comment:
   This is where we might get into trouble since we're breaking a constructor 
by adding a parameter.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -96,6 +98,8 @@ public class RecordAccumulatorTest {
     private final long maxBlockTimeMs = 1000;
     private final LogContext logContext = new LogContext();
 
+    private final Logger log = logContext.logger(RecordAccumulatorTest.class);

Review Comment:
   I am not personally against logging in tests, but they should be turned down 
so that they're not emitted by default.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",

Review Comment:
   Also super nit-picky: spaces after colons in message.
   
   Sorry, I can't help myself 😛 



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {

Review Comment:
   ```suggestion
       boolean maybeUpdateLeaderEpoch(int latestLeaderEpoch) {
   ```
   
   This will imply that we're not just determining that the leader has changed, 
we're updating internal state.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -859,8 +863,9 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster 
cluster, Node node, i
                 continue;
 
             Deque<ProducerBatch> deque = getDeque(tp);
-            if (deque == null)
+            if (deque == null) {

Review Comment:
   Unless there's a valid reason to, we should omit formatting changes.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;

Review Comment:
   Seems as though 0 would do, yes.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+            this, currentLeaderEpoch, leaderChangedAttempts, 
latestLeaderEpoch, attempts());
+        if (attempts() >= 1) {
+            // If the leader's epoch has changed, this counts as a leader 
change
+            if (currentLeaderEpoch != latestLeaderEpoch) {
+                leaderChangedAttempts = attempts();
+                leaderChanged = true;
+            } else {
+                // Otherwise, it's only a leader change until the first 
attempt is made with this leader

Review Comment:
   I'm sorry, I don't quite get this logic.
   
   If there are at least one attempt and the above epoch comparison is false, 
why do we consider the leader as having changed? 



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",

Review Comment:
   The value shouldn't change between those calls, right?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+            this, currentLeaderEpoch, leaderChangedAttempts, 
latestLeaderEpoch, attempts());
+        if (attempts() >= 1) {
+            // If the leader's epoch has changed, this counts as a leader 
change
+            if (currentLeaderEpoch != latestLeaderEpoch) {
+                leaderChangedAttempts = attempts();
+                leaderChanged = true;
+            } else {
+                // Otherwise, it's only a leader change until the first 
attempt is made with this leader
+                leaderChanged = attempts.get() == leaderChangedAttempts;
+            }
+        }
+        if (leaderChanged) {
+            log.debug("For {}, leaderChanged, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}",

Review Comment:
   ```suggestion
               log.debug("For {}, a leader change was detected, 
currentLeaderEpoch: {}, leaderChangedAttempts: {}",
   ```



##########
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##########
@@ -60,12 +69,19 @@ public int partition() {
     }
 
     /**
-     * The node id of the node currently acting as a leader for this partition 
or null if there is no leader
+     * The node currently acting as a leader for this partition or null if 
there is no leader
      */
     public Node leader() {
         return leader;
     }
 
+    /**
+     * The epoch of the partition's leader.

Review Comment:
   `Optional` here, right?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+            this, currentLeaderEpoch, leaderChangedAttempts, 
latestLeaderEpoch, attempts());
+        if (attempts() >= 1) {
+            // If the leader's epoch has changed, this counts as a leader 
change
+            if (currentLeaderEpoch != latestLeaderEpoch) {
+                leaderChangedAttempts = attempts();
+                leaderChanged = true;
+            } else {
+                // Otherwise, it's only a leader change until the first 
attempt is made with this leader
+                leaderChanged = attempts.get() == leaderChangedAttempts;
+            }
+        }
+        if (leaderChanged) {
+            log.debug("For {}, leaderChanged, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}",

Review Comment:
   Agreed.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",

Review Comment:
   Also super nit-picky: spaces after colons in message.
   
   Sorry, I can't help myself 😛 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to