kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1185525137


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -3405,6 +3406,54 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
         assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
     }
 
+    @Test
+    public void testMakeIllegalTransitionFatal() {
+        doInitTransactions();
+        assertTrue(transactionManager.isTransactional());
+
+        // Step 1: create a transaction.
+        transactionManager.beginTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        // Step 2: abort a transaction (wait for it to complete) and then 
verify that the transaction manager is
+        // left in the READY state.
+        TransactionalRequestResult abortResult = 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND);
+        runUntil(abortResult::isCompleted);
+        abortResult.await();
+        assertTrue(abortResult.isSuccessful());
+        assertFalse(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.isReady());
+
+        // Step 3: create a batch and simulate the Sender handling a failed 
batch, which would *attempt* to put
+        // the transaction manager in the ABORTABLE_ERROR state. However, that 
is an illegal state transition, so
+        // verify that it failed and caused the transaction manager to be put 
in an unrecoverable FATAL_ERROR state.
+        ProducerBatch batch = batchWithValue(tp0, "test");
+        assertThrowsFatalStateException("handleFailedBatch", () -> 
transactionManager.handleFailedBatch(batch, new NetworkException("Disconnected 
from node 4"), false));
+        assertTrue(transactionManager.hasFatalError());
+
+        // Step 4: validate that the transactions can't be started, committed
+        assertThrowsFatalStateException("beginTransaction", () -> 
transactionManager.beginTransaction());
+        assertThrowsFatalStateException("beginAbort", () -> 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND));
+        assertThrowsFatalStateException("beginCommit", () -> 
transactionManager.beginCommit());
+        assertThrowsFatalStateException("maybeAddPartition", () -> 
transactionManager.maybeAddPartition(tp0));
+        assertThrowsFatalStateException("initializeTransactions", () -> 
transactionManager.initializeTransactions());
+        assertThrowsFatalStateException("sendOffsetsToTransaction", () -> 
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("fake-group-id")));
+    }
+
+    private void assertThrowsFatalStateException(String methodName, Runnable 
operation) {
+        try {
+            operation.run();
+        } catch (KafkaException t) {

Review Comment:
   I added a check for `IllegalStateException` in `maybeFailWithError`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to