hachikuji commented on code in PR #11991:
URL: https://github.com/apache/kafka/pull/11991#discussion_r842248811


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -184,16 +184,22 @@ private void startSequencesAtBeginning(TopicPartition 
topicPartition, ProducerId
         // responses which are due to the retention period elapsing, and those 
which are due to actual lost data.
         private long lastAckedOffset;
 
+        private final Comparator<ProducerBatch> producerBatchComparator = (b1, 
b2) -> {

Review Comment:
   nit: could this be static? It doesn't have any state.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -674,6 +674,61 @@ public void testBatchCompletedAfterProducerReset() {
         assertNull(transactionManager.nextBatchBySequence(tp0));
     }
 
+    @Test
+    public void testDuplicateSequenceAfterProducerReset() throws Exception {
+        initializeTransactionManager(Optional.empty());
+        initializeIdempotentProducerId(producerId, epoch);
+
+        Metrics metrics = new Metrics(time);
+
+        RecordAccumulator accumulator = new RecordAccumulator(logContext, 16 * 
1024, CompressionType.NONE, 0, 0L,
+                15000, metrics, "", time, apiVersions, transactionManager,
+                new BufferPool(1024 * 1024, 16 * 1024, metrics, time, ""));
+
+        Sender sender = new Sender(logContext, this.client, this.metadata, 
accumulator, false,
+                MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(metrics), this.time, 10000,
+                0, transactionManager, apiVersions);
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+
+        Future<RecordMetadata> responseFuture1 = accumulator.append(tp0, 
time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS,
+                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+        sender.runOnce();
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+        time.sleep(10000); // request time out
+        sender.runOnce();
+        assertEquals(0, client.inFlightRequestCount());
+        assertTrue(transactionManager.hasInflightBatches(tp0));
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+        sender.runOnce(); // retry
+        assertEquals(1, client.inFlightRequestCount());
+        assertTrue(transactionManager.hasInflightBatches(tp0));
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+        time.sleep(5000); // delivery time out
+        sender.runOnce(); // expired in accumulator
+        assertFalse(transactionManager.hasInFlightRequest());
+        assertEquals(1, client.inFlightRequestCount()); // not reaching 
request timeout, so still in flight
+
+        sender.runOnce(); // bump the epoch
+        assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch);
+        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+
+        Future<RecordMetadata> responseFuture2 = accumulator.append(tp0, 
time.milliseconds(), "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS,
+                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+        sender.runOnce();
+        sender.runOnce();
+        assertEquals(0, transactionManager.firstInFlightSequence(tp0));
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+        time.sleep(5000); // request time out again
+        sender.runOnce();
+        assertTrue(transactionManager.hasInflightBatches(tp0)); // the latter 
batch failed and retried
+        assertTrue(responseFuture1.isDone());

Review Comment:
   nit: could we move this after line 710? It makes the test a little easier to 
understand if we see when the first send fails. Also, maybe we could use 
`TestUtils.assertFutureThrows(responseFuture1, TimeoutException.class)` to make 
the timeout expectation explicit?
   



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -674,6 +674,61 @@ public void testBatchCompletedAfterProducerReset() {
         assertNull(transactionManager.nextBatchBySequence(tp0));
     }
 
+    @Test
+    public void testDuplicateSequenceAfterProducerReset() throws Exception {
+        initializeTransactionManager(Optional.empty());
+        initializeIdempotentProducerId(producerId, epoch);
+
+        Metrics metrics = new Metrics(time);
+
+        RecordAccumulator accumulator = new RecordAccumulator(logContext, 16 * 
1024, CompressionType.NONE, 0, 0L,
+                15000, metrics, "", time, apiVersions, transactionManager,
+                new BufferPool(1024 * 1024, 16 * 1024, metrics, time, ""));
+
+        Sender sender = new Sender(logContext, this.client, this.metadata, 
accumulator, false,
+                MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(metrics), this.time, 10000,
+                0, transactionManager, apiVersions);
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+
+        Future<RecordMetadata> responseFuture1 = accumulator.append(tp0, 
time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS,
+                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+        sender.runOnce();
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+        time.sleep(10000); // request time out

Review Comment:
   nit: could introduce a variable `requestTimeoutMs` and drop the comment



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -674,6 +674,61 @@ public void testBatchCompletedAfterProducerReset() {
         assertNull(transactionManager.nextBatchBySequence(tp0));
     }
 
+    @Test
+    public void testDuplicateSequenceAfterProducerReset() throws Exception {
+        initializeTransactionManager(Optional.empty());
+        initializeIdempotentProducerId(producerId, epoch);
+
+        Metrics metrics = new Metrics(time);
+
+        RecordAccumulator accumulator = new RecordAccumulator(logContext, 16 * 
1024, CompressionType.NONE, 0, 0L,
+                15000, metrics, "", time, apiVersions, transactionManager,
+                new BufferPool(1024 * 1024, 16 * 1024, metrics, time, ""));
+
+        Sender sender = new Sender(logContext, this.client, this.metadata, 
accumulator, false,
+                MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(metrics), this.time, 10000,
+                0, transactionManager, apiVersions);
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+
+        Future<RecordMetadata> responseFuture1 = accumulator.append(tp0, 
time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS,
+                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+        sender.runOnce();
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+        time.sleep(10000); // request time out
+        sender.runOnce();
+        assertEquals(0, client.inFlightRequestCount());
+        assertTrue(transactionManager.hasInflightBatches(tp0));
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+        sender.runOnce(); // retry
+        assertEquals(1, client.inFlightRequestCount());
+        assertTrue(transactionManager.hasInflightBatches(tp0));
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+        time.sleep(5000); // delivery time out
+        sender.runOnce(); // expired in accumulator
+        assertFalse(transactionManager.hasInFlightRequest());

Review Comment:
   The behavior here puzzled me a little when I was trying to understand the 
test. Would a comment like this help?
   ```
   // The retry request will remain inflight until the request timeout
   // is reached even though the delivery timeout has been reached and
   // the future has completed exceptionally.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to