kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1185525137
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ########## @@ -3405,6 +3406,54 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); } + @Test + public void testMakeIllegalTransitionFatal() { + doInitTransactions(); + assertTrue(transactionManager.isTransactional()); + + // Step 1: create a transaction. + transactionManager.beginTransaction(); + assertTrue(transactionManager.hasOngoingTransaction()); + + // Step 2: abort a transaction (wait for it to complete) and then verify that the transaction manager is + // left in the READY state. + TransactionalRequestResult abortResult = transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND); + runUntil(abortResult::isCompleted); + abortResult.await(); + assertTrue(abortResult.isSuccessful()); + assertFalse(transactionManager.hasOngoingTransaction()); + assertTrue(transactionManager.isReady()); + + // Step 3: create a batch and simulate the Sender handling a failed batch, which would *attempt* to put + // the transaction manager in the ABORTABLE_ERROR state. However, that is an illegal state transition, so + // verify that it failed and caused the transaction manager to be put in an unrecoverable FATAL_ERROR state. + ProducerBatch batch = batchWithValue(tp0, "test"); + assertThrowsFatalStateException("handleFailedBatch", () -> transactionManager.handleFailedBatch(batch, new NetworkException("Disconnected from node 4"), false)); + assertTrue(transactionManager.hasFatalError()); + + // Step 4: validate that the transactions can't be started, committed + assertThrowsFatalStateException("beginTransaction", () -> transactionManager.beginTransaction()); + assertThrowsFatalStateException("beginAbort", () -> transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND)); + assertThrowsFatalStateException("beginCommit", () -> transactionManager.beginCommit()); + assertThrowsFatalStateException("maybeAddPartition", () -> transactionManager.maybeAddPartition(tp0)); + assertThrowsFatalStateException("initializeTransactions", () -> transactionManager.initializeTransactions()); + assertThrowsFatalStateException("sendOffsetsToTransaction", () -> transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("fake-group-id"))); + } + + private void assertThrowsFatalStateException(String methodName, Runnable operation) { + try { + operation.run(); + } catch (KafkaException t) { Review Comment: I added a check for `IllegalStateException` in `maybeFailWithError`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org