AndrewJSchofield commented on code in PR #14384: URL: https://github.com/apache/kafka/pull/14384#discussion_r1324641156
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ########## @@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() throws Exception { txnManager.beginTransaction(); } + @Test + public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws Exception { + Metrics m = new Metrics(); + SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); + try { + // SETUP + String metricGrpName = "producer-metrics-test-stats-1"; + long totalSize = 1024 * 1024; + BufferPool pool = new BufferPool(totalSize, batchSize, metrics, time, + metricGrpName); + long retryBackoffMaxMs = 100L; + // lingerMs is 0 to send batch as soon as any records are available on it. + this.accumulator = new RecordAccumulator(logContext, batchSize, + CompressionType.NONE, 0, 10L, retryBackoffMaxMs, + DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, null, pool); + Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, + MAX_REQUEST_SIZE, ACKS_ALL, + 10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, + apiVersions); + // Update metadata with leader-epochs. + int tp0LeaderEpoch = 100; + int epoch = tp0LeaderEpoch; + this.client.updateMetadata( + RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2), + tp -> { + if (tp0.equals(tp)) { + return epoch; + } else if (tp1.equals(tp)) { + return 0; + } else { + throw new RuntimeException("unexpected tp " + tp); + } + })); + + // Produce batch, it returns with a retry-able error like NOT_LEADER_OR_FOLLOWER, scheduled for retry. + // This triggers a metadata-request, that discovers a new-leader for tp0. + Future<RecordMetadata> futureIsProduced = appendToAccumulator(tp0, 0L, "key", "value"); + sender.runOnce(); // connect + sender.runOnce(); // send produce request + assertEquals(1, client.inFlightRequestCount(), + "We should have a single produce request in flight."); + assertEquals(1, sender.inFlightBatches(tp0).size()); + assertTrue(client.hasInFlightRequests()); + client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_OR_FOLLOWER, 0)); + sender.runOnce(); // receive produce response, batch scheduled for retry + assertTrue(!futureIsProduced.isDone(), "Produce request is yet not done."); + + // TEST that as new-leader(with epochA) is discovered, the batch is retried immediately i.e. skips any backoff period. + // Update leader epoch for tp0 + log.info("Test that to a new-leader, batch is retried immediately."); + int newEpoch = ++tp0LeaderEpoch; + this.client.updateMetadata( + RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2), + tp -> { + if (tp0.equals(tp)) { + return newEpoch; + } else if (tp1.equals(tp)) { + return 0; + } else { + throw new RuntimeException("unexpected tp " + tp); + } + })); + sender.runOnce(); // send produce request, immediately. + assertEquals(1, sender.inFlightBatches(tp0).size()); + assertTrue(client.hasInFlightRequests()); + client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_OR_FOLLOWER, 0)); Review Comment: Should there not be testing that uses the new `CurrentLeader` field in v9 of the ProduceResponse given that this is part of KIP-951 which this PR is implementing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org