hachikuji commented on code in PR #11991: URL: https://github.com/apache/kafka/pull/11991#discussion_r842248811
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -184,16 +184,22 @@ private void startSequencesAtBeginning(TopicPartition topicPartition, ProducerId // responses which are due to the retention period elapsing, and those which are due to actual lost data. private long lastAckedOffset; + private final Comparator<ProducerBatch> producerBatchComparator = (b1, b2) -> { Review Comment: nit: could this be static? It doesn't have any state. ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ########## @@ -674,6 +674,61 @@ public void testBatchCompletedAfterProducerReset() { assertNull(transactionManager.nextBatchBySequence(tp0)); } + @Test + public void testDuplicateSequenceAfterProducerReset() throws Exception { + initializeTransactionManager(Optional.empty()); + initializeIdempotentProducerId(producerId, epoch); + + Metrics metrics = new Metrics(time); + + RecordAccumulator accumulator = new RecordAccumulator(logContext, 16 * 1024, CompressionType.NONE, 0, 0L, + 15000, metrics, "", time, apiVersions, transactionManager, + new BufferPool(1024 * 1024, 16 * 1024, metrics, time, "")); + + Sender sender = new Sender(logContext, this.client, this.metadata, accumulator, false, + MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, 10000, + 0, transactionManager, apiVersions); + + assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + + Future<RecordMetadata> responseFuture1 = accumulator.append(tp0, time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, + null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future; + sender.runOnce(); + assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + + time.sleep(10000); // request time out + sender.runOnce(); + assertEquals(0, client.inFlightRequestCount()); + assertTrue(transactionManager.hasInflightBatches(tp0)); + assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + sender.runOnce(); // retry + assertEquals(1, client.inFlightRequestCount()); + assertTrue(transactionManager.hasInflightBatches(tp0)); + assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + + time.sleep(5000); // delivery time out + sender.runOnce(); // expired in accumulator + assertFalse(transactionManager.hasInFlightRequest()); + assertEquals(1, client.inFlightRequestCount()); // not reaching request timeout, so still in flight + + sender.runOnce(); // bump the epoch + assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch); + assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + + Future<RecordMetadata> responseFuture2 = accumulator.append(tp0, time.milliseconds(), "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, + null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future; + sender.runOnce(); + sender.runOnce(); + assertEquals(0, transactionManager.firstInFlightSequence(tp0)); + assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + + time.sleep(5000); // request time out again + sender.runOnce(); + assertTrue(transactionManager.hasInflightBatches(tp0)); // the latter batch failed and retried + assertTrue(responseFuture1.isDone()); Review Comment: nit: could we move this after line 710? It makes the test a little easier to understand if we see when the first send fails. Also, maybe we could use `TestUtils.assertFutureThrows(responseFuture1, TimeoutException.class)` to make the timeout expectation explicit? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ########## @@ -674,6 +674,61 @@ public void testBatchCompletedAfterProducerReset() { assertNull(transactionManager.nextBatchBySequence(tp0)); } + @Test + public void testDuplicateSequenceAfterProducerReset() throws Exception { + initializeTransactionManager(Optional.empty()); + initializeIdempotentProducerId(producerId, epoch); + + Metrics metrics = new Metrics(time); + + RecordAccumulator accumulator = new RecordAccumulator(logContext, 16 * 1024, CompressionType.NONE, 0, 0L, + 15000, metrics, "", time, apiVersions, transactionManager, + new BufferPool(1024 * 1024, 16 * 1024, metrics, time, "")); + + Sender sender = new Sender(logContext, this.client, this.metadata, accumulator, false, + MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, 10000, + 0, transactionManager, apiVersions); + + assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + + Future<RecordMetadata> responseFuture1 = accumulator.append(tp0, time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, + null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future; + sender.runOnce(); + assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + + time.sleep(10000); // request time out Review Comment: nit: could introduce a variable `requestTimeoutMs` and drop the comment ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ########## @@ -674,6 +674,61 @@ public void testBatchCompletedAfterProducerReset() { assertNull(transactionManager.nextBatchBySequence(tp0)); } + @Test + public void testDuplicateSequenceAfterProducerReset() throws Exception { + initializeTransactionManager(Optional.empty()); + initializeIdempotentProducerId(producerId, epoch); + + Metrics metrics = new Metrics(time); + + RecordAccumulator accumulator = new RecordAccumulator(logContext, 16 * 1024, CompressionType.NONE, 0, 0L, + 15000, metrics, "", time, apiVersions, transactionManager, + new BufferPool(1024 * 1024, 16 * 1024, metrics, time, "")); + + Sender sender = new Sender(logContext, this.client, this.metadata, accumulator, false, + MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, 10000, + 0, transactionManager, apiVersions); + + assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + + Future<RecordMetadata> responseFuture1 = accumulator.append(tp0, time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, + null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future; + sender.runOnce(); + assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + + time.sleep(10000); // request time out + sender.runOnce(); + assertEquals(0, client.inFlightRequestCount()); + assertTrue(transactionManager.hasInflightBatches(tp0)); + assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + sender.runOnce(); // retry + assertEquals(1, client.inFlightRequestCount()); + assertTrue(transactionManager.hasInflightBatches(tp0)); + assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + + time.sleep(5000); // delivery time out + sender.runOnce(); // expired in accumulator + assertFalse(transactionManager.hasInFlightRequest()); Review Comment: The behavior here puzzled me a little when I was trying to understand the test. Would a comment like this help? ``` // The retry request will remain inflight until the request timeout // is reached even though the delivery timeout has been reached and // the future has completed exceptionally. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org