wcarlson5 commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1344314253


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = Optional.empty();
+        this.attemptsWhenLeaderLastChanged = 0;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * This is called to update the leader-epoch to which this batch is going 
to be produced in the ongoing attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {

Review Comment:
   It seems to be that this doesn't actually update the Leader Epoch, but 
rather it checks to see if the leader has changed since the last retry. Can we 
name it something more clear?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -156,6 +157,8 @@ public class SenderTest {
     private SenderMetricsRegistry senderMetricsRegistry = null;
     private final LogContext logContext = new LogContext();
 
+    private final Logger log = logContext.logger(SenderTest.class);

Review Comment:
   Generally adding logging in tests isn't great. I would second removing it. I 
don't think the two lines added are worth the change. 
   
   If you want that information about why a batch isn't being processed or if a 
batch is being processed you should add them as logs to the actual code. They 
can be trace if necessary



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() 
throws Exception {
 
         txnManager.beginTransaction();
     }
+    @Test
+    public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws 
Exception {
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+        try {
+            // SETUP
+            String metricGrpName = "producer-metrics-test-stats-1";
+            long totalSize = 1024 * 1024;
+            BufferPool pool = new BufferPool(totalSize, batchSize, metrics, 
time,
+                metricGrpName);
+            long retryBackoffMaxMs = 100L;

Review Comment:
   How difficult would it be to mock the backoff using a ticker? Too many times 
having to rely on timing in tests makes them become flaky. This seems like it 
would low probability here but we should keep it in mind



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches() 
throws Exception {
             time.sleep(10);
 
             // We should have one batch ready.
-            Set<Node> nodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
+            Set<Node> nodes = accum.ready(metadataMock, 
time.milliseconds()).readyNodes;
             assertEquals(1, nodes.size(), "Should have 1 leader ready");
-            List<ProducerBatch> batches = accum.drain(cluster, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+            List<ProducerBatch> batches = accum.drain(metadataMock, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
             assertEquals(1, batches.size(), "Should have 1 batch ready");
             int actualBatchSize = batches.get(0).records().sizeInBytes();
             assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater 
than half batch.size");
             assertTrue(actualBatchSize < batchSize, "Batch must be less than 
batch.size");
         }
     }
 
+    /**
+     * For a batch being retried, this validates ready() and drain() whether a 
batch should skip-backoff(retries-immediately), or backoff, based on -
+     * 1. how long it has waited between retry attempts.
+     * 2. change in leader hosting the partition.
+     */
+    @Test
+    public void testReadyAndDrainWhenABatchIsBeingRetried() throws 
InterruptedException {

Review Comment:
   I think this is just my unfamiliarity talking, but how are you checking if 
it is backing off? Is it just relying on these lines? 
   
   ```       
   assertEquals(Optional.of(part1LeaderEpoch), batch.currentLeaderEpoch());
   assertEquals(0, batch.attemptsWhenLeaderLastChanged());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -712,13 +714,14 @@ private long partitionReady(Cluster cluster, long nowMs, 
String topic,
                 }
 
                 waitedTimeMs = batch.waitedTimeMs(nowMs);
-                backingOff = shouldBackoff(batch, waitedTimeMs);
+                boolean hasLeaderChanged = 
batch.maybeUpdateLeaderEpoch(leaderAndEpoch.epoch);

Review Comment:
   See that method name and variable name don't really make sense together



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches() 
throws Exception {
             time.sleep(10);
 
             // We should have one batch ready.
-            Set<Node> nodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
+            Set<Node> nodes = accum.ready(metadataMock, 
time.milliseconds()).readyNodes;
             assertEquals(1, nodes.size(), "Should have 1 leader ready");
-            List<ProducerBatch> batches = accum.drain(cluster, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+            List<ProducerBatch> batches = accum.drain(metadataMock, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
             assertEquals(1, batches.size(), "Should have 1 batch ready");
             int actualBatchSize = batches.get(0).records().sizeInBytes();
             assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater 
than half batch.size");
             assertTrue(actualBatchSize < batchSize, "Batch must be less than 
batch.size");
         }
     }
 
+    /**
+     * For a batch being retried, this validates ready() and drain() whether a 
batch should skip-backoff(retries-immediately), or backoff, based on -
+     * 1. how long it has waited between retry attempts.
+     * 2. change in leader hosting the partition.
+     */
+    @Test
+    public void testReadyAndDrainWhenABatchIsBeingRetried() throws 
InterruptedException {
+        int part1LeaderEpoch = 100;
+        // Create cluster metadata, partition1 being hosted by node1.
+        part1 = new PartitionInfo(topic, partition1, node1, null, null, null);
+        cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+            Collections.emptySet(), Collections.emptySet());
+        final int finalEpoch = part1LeaderEpoch;
+        metadataMock = setupMetadata(cluster, tp -> finalEpoch);
+
+        int batchSize = 10;
+        int lingerMs = 10;
+        int retryBackoffMs = 100;
+        int retryBackoffMaxMs = 1000;
+        int deliveryTimeoutMs = Integer.MAX_VALUE;
+        long totalSize = 10 * 1024;
+        String metricGrpName = "producer-metrics";
+        final RecordAccumulator accum = new RecordAccumulator(logContext, 
batchSize,
+            CompressionType.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
+            deliveryTimeoutMs, metrics, metricGrpName, time, new 
ApiVersions(), null,
+            new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
+
+        // Create 1 batch(batchA) to be produced to partition1.
+        long now = time.milliseconds();
+        accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, 
null, maxBlockTimeMs, false, now, cluster);
+
+        // 1st attempt to produce batchA, it should be ready & drained to be 
produced.
+        {
+            log.info("Running 1st attempt to produce batchA, it should be 
ready & drained to be produced.");
+            now += lingerMs + 1;
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(0, batch.attemptsWhenLeaderLastChanged());
+            // Re-enqueue batch for subsequent retries & test-cases
+            accum.reenqueue(batch, now);
+        }
+
+        // In this retry of batchA, wait-time between retries is less than 
configured and no leader change, so should backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
less than configured and no leader change, so should backoff.");
+            now += 1;
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertFalse(result.readyNodes.contains(node1), "Node1 is not 
ready");
+
+            // Try to drain from node1, it should return no batches.
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                new HashSet<>(Arrays.asList(node1)), 999999 /* maxSize */, 
now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).isEmpty(),
+                "No batches ready to be drained on Node1");
+        }
+
+        // In this retry of batchA, wait-time between retries is less than 
configured and leader has changed, so should not backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
less than configured and leader has changed, so should not backoff.");
+            now += 1;
+            part1LeaderEpoch++;
+            // Create cluster metadata, with new leader epoch.
+            part1 = new PartitionInfo(topic, partition1, node1, null, null, 
null);
+            cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+                Collections.emptySet(), Collections.emptySet());
+            final int finalPart1LeaderEpoch = part1LeaderEpoch;
+            metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+            // Re-enqueue batch for subsequent retries/test-cases.
+            accum.reenqueue(batch, now);
+        }
+
+        // In this retry of batchA, wait-time between retries is more than 
configured and no leader change, so should not backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
more than configured and no leader change, so should not backoff");
+            now += 2 * retryBackoffMaxMs;
+            // Create cluster metadata, with new leader epoch.
+            part1 = new PartitionInfo(topic, partition1, node1, null, null, 
null);
+            cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+                Collections.emptySet(), Collections.emptySet());
+            final int finalPart1LeaderEpoch = part1LeaderEpoch;
+            metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+            // Re-enqueue batch for subsequent retries/test-cases.
+            accum.reenqueue(batch, now);
+        }
+
+        // In this retry of batchA, wait-time between retries is more than 
configured and leader has changed, so should not backoff.
+        {
+            log.info("In this retry of batchA, wait-time between retries is 
more than configured and leader has changed, so should not backoff.");
+            now += 2 * retryBackoffMaxMs;
+            part1LeaderEpoch++;
+            // Create cluster metadata, with new leader epoch.
+            part1 = new PartitionInfo(topic, partition1, node1, null, null, 
null);
+            cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+                Collections.emptySet(), Collections.emptySet());
+            final int finalPart1LeaderEpoch = part1LeaderEpoch;
+            metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+            RecordAccumulator.ReadyCheckResult result = 
accum.ready(metadataMock, now);
+            assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+            Map<Integer, List<ProducerBatch>> batches = 
accum.drain(metadataMock,
+                result.readyNodes, 999999 /* maxSize */, now);
+            assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+            ProducerBatch batch = batches.get(node1.id()).get(0);
+            assertEquals(Optional.of(part1LeaderEpoch), 
batch.currentLeaderEpoch());
+            assertEquals(3, batch.attemptsWhenLeaderLastChanged());
+
+            // Re-enqueue batch for subsequent retries/test-cases.
+            accum.reenqueue(batch, now);
+        }
+    }
+
+    @Test
+    public void testDrainWithANodeThatDoesntHostAnyPartitions() {

Review Comment:
   Is this just added to increase coverage? Thats fine, but I don't see how its 
related to the rest of the PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to