AndrewJSchofield commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1324641156


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() 
throws Exception {
 
         txnManager.beginTransaction();
     }
+    @Test
+    public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws 
Exception {
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+        try {
+            // SETUP
+            String metricGrpName = "producer-metrics-test-stats-1";
+            long totalSize = 1024 * 1024;
+            BufferPool pool = new BufferPool(totalSize, batchSize, metrics, 
time,
+                metricGrpName);
+            long retryBackoffMaxMs = 100L;
+            // lingerMs is 0 to send batch as soon as any records are 
available on it.
+            this.accumulator = new RecordAccumulator(logContext, batchSize,
+                CompressionType.NONE, 0, 10L, retryBackoffMaxMs,
+                DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, 
apiVersions, null, pool);
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false,
+                MAX_REQUEST_SIZE, ACKS_ALL,
+                10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
null,
+                apiVersions);
+            // Update metadata with leader-epochs.
+            int tp0LeaderEpoch = 100;
+            int epoch = tp0LeaderEpoch;
+            this.client.updateMetadata(
+                RequestTestUtils.metadataUpdateWith(1, 
Collections.singletonMap("test", 2),
+                    tp -> {
+                        if (tp0.equals(tp)) {
+                            return epoch;
+                        }  else if (tp1.equals(tp)) {
+                            return 0;
+                        } else {
+                            throw new RuntimeException("unexpected tp " + tp);
+                        }
+                    }));
+
+            // Produce batch, it returns with a retry-able error like 
NOT_LEADER_OR_FOLLOWER, scheduled for retry.
+            // This triggers a metadata-request, that discovers a new-leader 
for tp0.
+            Future<RecordMetadata> futureIsProduced = appendToAccumulator(tp0, 
0L, "key", "value");
+            sender.runOnce(); // connect
+            sender.runOnce(); // send produce request
+            assertEquals(1, client.inFlightRequestCount(),
+                "We should have a single produce request in flight.");
+            assertEquals(1, sender.inFlightBatches(tp0).size());
+            assertTrue(client.hasInFlightRequests());
+            client.respond(produceResponse(tp0, -1, 
Errors.NOT_LEADER_OR_FOLLOWER, 0));
+            sender.runOnce(); // receive produce response, batch scheduled for 
retry
+            assertTrue(!futureIsProduced.isDone(), "Produce request is yet not 
done.");
+
+            // TEST that as new-leader(with epochA) is discovered, the batch 
is retried immediately i.e. skips any backoff period.
+            // Update leader epoch for tp0
+            log.info("Test that to a new-leader, batch is retried 
immediately.");
+            int newEpoch = ++tp0LeaderEpoch;
+            this.client.updateMetadata(
+                RequestTestUtils.metadataUpdateWith(1, 
Collections.singletonMap("test", 2),
+                    tp -> {
+                        if (tp0.equals(tp)) {
+                            return newEpoch;
+                        } else if (tp1.equals(tp)) {
+                            return 0;
+                        } else {
+                            throw new RuntimeException("unexpected tp " + tp);
+                        }
+                    }));
+            sender.runOnce(); // send produce request, immediately.
+            assertEquals(1, sender.inFlightBatches(tp0).size());
+            assertTrue(client.hasInFlightRequests());
+            client.respond(produceResponse(tp0, -1, 
Errors.NOT_LEADER_OR_FOLLOWER, 0));

Review Comment:
   Should there not be testing that uses the new `CurrentLeader` field in v9 of 
the ProduceResponse given that this is part of KIP-951 which this PR is 
implementing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to