hachikuji commented on a change in pull request #10880:
URL: https://github.com/apache/kafka/pull/10880#discussion_r654624934



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
         }
     }
 
+    @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws 
Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent 
immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is 
sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request 
to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still 
effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws 
Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We 
expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager 
txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, 
Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long 
offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    private void runUntil(Sender sender, Supplier<Boolean> condition) {

Review comment:
       Would it be reasonable to add a helper to `Sender` for testing?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
         }
     }
 
+    @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws 
Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent 
immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is 
sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request 
to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still 
effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws 
Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We 
expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager 
txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, 
Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long 
offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    private void runUntil(Sender sender, Supplier<Boolean> condition) {

Review comment:
       Would it be reasonable to add a `runUntil` helper to `Sender` for 
testing?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
         }
     }
 
+    @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws 
Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent 
immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is 
sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request 
to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still 
effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws 
Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We 
expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager 
txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, 
Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long 
offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    private void runUntil(Sender sender, Supplier<Boolean> condition) {

Review comment:
       Decided to pull out a test util instead. Hopefully that is ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to