jolshan commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324792282
########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -2569,6 +2571,70 @@ class ReplicaManagerTest { assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) + + // Test we convert the error correctly when trying to append and coordinator is not available + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + replicaManager.becomeLeaderOrFollower(1, + makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), + (_, _) => ()) + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("message".getBytes)) + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1)) + val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}" + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedError, result.assertFired.errorMessage) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testVerificationErrorConversions(): Unit = { + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + val node = new Node(0, "host1", 0) + val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + + val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node) + try { + replicaManager.becomeLeaderOrFollower(1, + makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), + (_, _) => ()) + + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(producerEpoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava + )) + + // Start verification and return the coordinator related errors. + var invocations = 1 + def verifyError(error: Errors): Unit = { + val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) + val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) + verify(addPartitionsToTxnManager, times(invocations)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) + + // Confirm we did not write to the log and instead returned the converted error with the correct error message. + val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() + callback(Map(tp0 -> error).toMap) + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedMessage, result.assertFired.errorMessage) + invocations = invocations + 1 + } + + Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_)) Review Comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org