msn-tldr commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1345789833


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = Optional.empty();
+        this.attemptsWhenLeaderLastChanged = 0;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * This is called to update the leader-epoch to which this batch is going 
to be produced in the ongoing attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {

Review Comment:
   So the method does 2 things
   1. updates leader if a newer leader is supplied
   2. also return has changed for the ongoing retry of the batch. 
   
   Keeping them both in 1 method, is a bit confusing, as evident in the naming 
😄 
   
   So i have split it into 2 methods, to me code is more readable now.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() 
throws Exception {
 
         txnManager.beginTransaction();
     }
+    @Test
+    public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws 
Exception {
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+        try {
+            // SETUP
+            String metricGrpName = "producer-metrics-test-stats-1";
+            long totalSize = 1024 * 1024;
+            BufferPool pool = new BufferPool(totalSize, batchSize, metrics, 
time,
+                metricGrpName);
+            long retryBackoffMaxMs = 100L;

Review Comment:
   Fair point. The `SenderTest` already mocks `time` using `MockTime`, and that 
is manually advanced. Is that what u had in mind?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches() 
throws Exception {
             time.sleep(10);
 
             // We should have one batch ready.
-            Set<Node> nodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
+            Set<Node> nodes = accum.ready(metadataMock, 
time.milliseconds()).readyNodes;
             assertEquals(1, nodes.size(), "Should have 1 leader ready");
-            List<ProducerBatch> batches = accum.drain(cluster, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+            List<ProducerBatch> batches = accum.drain(metadataMock, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
             assertEquals(1, batches.size(), "Should have 1 batch ready");
             int actualBatchSize = batches.get(0).records().sizeInBytes();
             assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater 
than half batch.size");
             assertTrue(actualBatchSize < batchSize, "Batch must be less than 
batch.size");
         }
     }
 
+    /**
+     * For a batch being retried, this validates ready() and drain() whether a 
batch should skip-backoff(retries-immediately), or backoff, based on -
+     * 1. how long it has waited between retry attempts.
+     * 2. change in leader hosting the partition.
+     */
+    @Test
+    public void testReadyAndDrainWhenABatchIsBeingRetried() throws 
InterruptedException {

Review Comment:
   So if  
   1. batch should backoff - then both `ready` & `drain` would not have node1 & 
batch respectively in their output.
   ```
   assertFalse(result.readyNodes.contains(node1), "Node1 is not ready");
   assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).isEmpty(),
                   "No batches ready to be drained on Node1");
   ```
   2. batch should not be backing off - then both `ready` & `drain` would have 
node1 & batch respectively in their output.
   ```
   assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
   assertTrue(batches.containsKey(node1.id()) && batches.get(node1.id()).size() 
== 1, "Node1 has 1 batch ready & drained");
   ```
   
   Other assertions are sanity checking auxillary conditions



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches() 
throws Exception {
             time.sleep(10);
 
             // We should have one batch ready.
-            Set<Node> nodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
+            Set<Node> nodes = accum.ready(metadataMock, 
time.milliseconds()).readyNodes;
             assertEquals(1, nodes.size(), "Should have 1 leader ready");
-            List<ProducerBatch> batches = accum.drain(cluster, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+            List<ProducerBatch> batches = accum.drain(metadataMock, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
             assertEquals(1, batches.size(), "Should have 1 batch ready");
             int actualBatchSize = batches.get(0).records().sizeInBytes();
             assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater 
than half batch.size");
             assertTrue(actualBatchSize < batchSize, "Batch must be less than 
batch.size");
         }
     }
 
+    /**
+     * For a batch being retried, this validates ready() and drain() whether a 
batch should skip-backoff(retries-immediately), or backoff, based on -
+     * 1. how long it has waited between retry attempts.
+     * 2. change in leader hosting the partition.
+     */
+    @Test
+    public void testReadyAndDrainWhenABatchIsBeingRetried() throws 
InterruptedException {
+        int part1LeaderEpoch = 100;
+        // Create cluster metadata, partition1 being hosted by node1.
+        part1 = new PartitionInfo(topic, partition1, node1, null, null, null);
+        cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+            Collections.emptySet(), Collections.emptySet());
+        final int finalEpoch = part1LeaderEpoch;
+        metadataMock = setupMetadata(cluster, tp -> finalEpoch);
+
+        int batchSize = 10;
+        int lingerMs = 10;
+        int retryBackoffMs = 100;
+        int retryBackoffMaxMs = 1000;
+        int deliveryTimeoutMs = Integer.MAX_VALUE;
+        long totalSize = 10 * 1024;
+        String metricGrpName = "producer-metrics";
+        final RecordAccumulator accum = new RecordAccumulator(logContext, 
batchSize,
+            CompressionType.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
+            deliveryTimeoutMs, metrics, metricGrpName, time, new 
ApiVersions(), null,
+            new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
+
+        // Create 1 batch(batchA) to be produced to partition1.
+        long now = time.milliseconds();
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, 
null, maxBlockTimeMs, false, now, cluster);
+
+        // 1st attempt to produce batchA, it should be ready & drained to be 
produced.
+        {
+            log.info("Running 1st attempt to produce batchA, it should be 
ready & drained to be produced.");
+            now += lingerMs + 1;
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(0, batch.attemptsWhenLeaderLastChanged());
+            // Re-enqueue batch for subsequent retries & test-cases
+            accum.reenqueue(batch, now);
+        }
+
+        // In this retry of batchA, wait-time between retries is less than 
configured and no leader change, so should backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
less than configured and no leader change, so should backoff.");
+            now += 1;
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertFalse(result.readyNodes.contains(node1), "Node1 is not 
ready");
+
+            // Try to drain from node1, it should return no batches.
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                new HashSet<>(Arrays.asList(node1)), 999999 /* maxSize */, 
now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).isEmpty(),
+                "No batches ready to be drained on Node1");
+        }
+
+        // In this retry of batchA, wait-time between retries is less than 
configured and leader has changed, so should not backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
less than configured and leader has changed, so should not backoff.");
+            now += 1;
+            part1LeaderEpoch++;
+            // Create cluster metadata, with new leader epoch.
+            part1 = new PartitionInfo(topic, partition1, node1, null, null, 
null);
+            cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+                Collections.emptySet(), Collections.emptySet());
+            final int finalPart1LeaderEpoch = part1LeaderEpoch;
+            metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+            // Re-enqueue batch for subsequent retries/test-cases.
+            accum.reenqueue(batch, now);
+        }
+
+        // In this retry of batchA, wait-time between retries is more than 
configured and no leader change, so should not backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
more than configured and no leader change, so should not backoff");
+            now += 2 * retryBackoffMaxMs;
+            // Create cluster metadata, with new leader epoch.
+            part1 = new PartitionInfo(topic, partition1, node1, null, null, 
null);
+            cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+                Collections.emptySet(), Collections.emptySet());
+            final int finalPart1LeaderEpoch = part1LeaderEpoch;
+            metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+            // Re-enqueue batch for subsequent retries/test-cases.
+            accum.reenqueue(batch, now);
+        }
+
+        // In this retry of batchA, wait-time between retries is more than 
configured and leader has changed, so should not backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
more than configured and leader has changed, so should not backoff.");
+            now += 2 * retryBackoffMaxMs;
+            part1LeaderEpoch++;
+            // Create cluster metadata, with new leader epoch.
+            part1 = new PartitionInfo(topic, partition1, node1, null, null, 
null);
+            cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+                Collections.emptySet(), Collections.emptySet());
+            final int finalPart1LeaderEpoch = part1LeaderEpoch;
+            metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(3, batch.attemptsWhenLeaderLastChanged());
+
+            // Re-enqueue batch for subsequent retries/test-cases.
+            accum.reenqueue(batch, now);
+        }
+    }
+
+    @Test
+    public void testDrainWithANodeThatDoesntHostAnyPartitions() {

Review Comment:
   `RecordAccumulator` `drain` now uses Metadata Vs Cluster. Metadata can 
change where as Cluster is immutable. Hence added tests w.r.t metadata 
changing(which weren't possible earlier)
   1. node(broker) hosts partitions but partitions map to it change underneath
   2. node(broker) no longer hosts any partitions.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = Optional.empty();
+        this.attemptsWhenLeaderLastChanged = 0;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * This is called to update the leader-epoch to which this batch is going 
to be produced in the ongoing attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {

Review Comment:
   So the method does 2 things
   1. updates leader if a newer leader is supplied
   2. also return has changed for the ongoing retry of the batch. 
   
   Keeping them both in 1 method, is a bit confusing, as evident in the naming 
😄 
   
   So i have split it into 2 methods, to me code is more readable now.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = Optional.empty();
+        this.attemptsWhenLeaderLastChanged = 0;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * This is called to update the leader-epoch to which this batch is going 
to be produced in the ongoing attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {
+        if (!latestLeaderEpoch.isPresent())
+            return false;
+
+        boolean leaderChanged = false;
+        int attempts = attempts();
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch: 
{}, attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: 
{}",
+            this, currentLeaderEpoch.isPresent() ? currentLeaderEpoch.get() : 
"un-initialized", attemptsWhenLeaderLastChanged, latestLeaderEpoch.get(), 
attempts);
+        boolean isRetry = attempts >= 1;
+        // Checking for leader change makes sense only from 1st retry 
onwards(i.e attempt >=1).
+        if (isRetry) {
+            // If the leader's epoch has changed, this counts as a leader 
change
+            if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+                attemptsWhenLeaderLastChanged = attempts;
+                leaderChanged = true;
+            } else {
+                // Otherwise, it's only a leader change until the first 
attempt is made with this leader
+                leaderChanged = attempts == attemptsWhenLeaderLastChanged;
+            }
+        }
+        if (leaderChanged) {
+            log.debug("For {}, leader has changed, oldEpoch: {}, newEpoch: {}",

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to