jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1528996611
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ########## @@ -3523,6 +3524,128 @@ public void testForegroundInvalidStateTransitionIsRecoverable() { assertFalse(transactionManager.hasOngoingTransaction()); } + @Test + public void testAbortableTransactionExceptionInInitProducerId() { + TransactionalRequestResult initPidResult = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + prepareInitPidResponse(Errors.ABORTABLE_TRANSACTION, false, producerId, RecordBatch.NO_PRODUCER_EPOCH); + runUntil(transactionManager::hasError); + assertTrue(initPidResult.isCompleted()); + assertFalse(initPidResult.isSuccessful()); + assertThrows(AbortableTransactionException.class, initPidResult::await); + assertAbortableError(AbortableTransactionException.class); + } + + @Test + public void testAbortableTransactionExceptionInAddPartitions() { + final TopicPartition tp = new TopicPartition("foo", 0); + + doInitTransactions(); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartition(tp); + + prepareAddPartitionsToTxn(tp, Errors.ABORTABLE_TRANSACTION); + runUntil(transactionManager::hasError); + assertTrue(transactionManager.lastError() instanceof AbortableTransactionException); + + assertAbortableError(AbortableTransactionException.class); + } + + @Test + public void testAbortableTransactionExceptionInFindCoordinator() { + doInitTransactions(); + + transactionManager.beginTransaction(); + TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction( + singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), new ConsumerGroupMetadata(consumerGroupId)); + + prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch); + runUntil(() -> !transactionManager.hasPartitionsToAdd()); + + prepareFindCoordinatorResponse(Errors.ABORTABLE_TRANSACTION, false, CoordinatorType.GROUP, consumerGroupId); + runUntil(transactionManager::hasError); + assertTrue(transactionManager.lastError() instanceof AbortableTransactionException); + + runUntil(sendOffsetsResult::isCompleted); + assertFalse(sendOffsetsResult.isSuccessful()); + assertTrue(sendOffsetsResult.error() instanceof AbortableTransactionException); + + assertAbortableError(AbortableTransactionException.class); + } + + @Test + public void testAbortableTransactionExceptionInEndTxn() throws InterruptedException { + doInitTransactions(); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartition(tp0); + TransactionalRequestResult commitResult = transactionManager.beginCommit(); + + Future<RecordMetadata> responseFuture = appendToAccumulator(tp0); + + assertFalse(responseFuture.isDone()); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); + prepareProduceResponse(Errors.NONE, producerId, epoch); + prepareEndTxnResponse(Errors.ABORTABLE_TRANSACTION, TransactionResult.COMMIT, producerId, epoch); + + runUntil(commitResult::isCompleted); + runUntil(responseFuture::isDone); + + assertThrows(KafkaException.class, commitResult::await); + assertFalse(commitResult.isSuccessful()); + assertTrue(commitResult.isAcked()); + + assertAbortableError(AbortableTransactionException.class); + // make sure the exception was thrown directly from the follow-up calls. Review Comment: nit: was this comment meant to be before further checks? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org