jolshan commented on code in PR #14402: URL: https://github.com/apache/kafka/pull/14402#discussion_r1330523078
########## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ########## @@ -68,87 +71,143 @@ class AddPartitionsToTxnManagerTest { private val versionMismatchResponse = clientResponse(null, mismatchException = new UnsupportedVersionException("")) private val disconnectedResponse = clientResponse(null, disconnected = true) + private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")) + @BeforeEach def setup(): Unit = { addPartitionsToTxnManager = new AddPartitionsToTxnManager( - KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")), + config, networkClient, - time) + metadataCache, + partitionFor, + time + ) } @AfterEach def teardown(): Unit = { addPartitionsToTxnManager.shutdown() } - def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { - callbackErrors.foreach { - case (tp, error) => errors.put(tp, error) - } + private def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { + callbackErrors.forKeyValue(errors.put) } @Test def testAddTxnData(): Unit = { + when(partitionFor.apply(transactionalId1)).thenReturn(0) + when(partitionFor.apply(transactionalId2)).thenReturn(1) + when(partitionFor.apply(transactionalId3)).thenReturn(0) + when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)) + .thenReturn(Seq( + new MetadataResponseData.MetadataResponseTopic() + .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) + .setPartitions(List( + new MetadataResponseData.MetadataResponsePartition() + .setPartitionIndex(0) + .setLeaderId(0), + new MetadataResponseData.MetadataResponsePartition() + .setPartitionIndex(1) + .setLeaderId(1) + ).asJava) + )) + when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)) + .thenReturn(Some(node0)) + when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)) + .thenReturn(Some(node1)) + val transaction1Errors = mutable.Map[TopicPartition, Errors]() val transaction2Errors = mutable.Map[TopicPartition, Errors]() val transaction3Errors = mutable.Map[TopicPartition, Errors]() - addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1Errors)) - addPartitionsToTxnManager.addTxnData(node1, transactionData(transactionalId2, producerId2), setErrors(transaction2Errors)) - addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId3, producerId3), setErrors(transaction3Errors)) + addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transaction1Errors)) + addPartitionsToTxnManager.addTxnData(transactionalId2, producerId2, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transaction2Errors)) + addPartitionsToTxnManager.addTxnData(transactionalId3, producerId3, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transaction3Errors)) // We will try to add transaction1 3 more times (retries). One will have the same epoch, one will have a newer epoch, and one will have an older epoch than the new one we just added. val transaction1RetryWithSameEpochErrors = mutable.Map[TopicPartition, Errors]() val transaction1RetryWithNewerEpochErrors = mutable.Map[TopicPartition, Errors]() val transaction1RetryWithOldEpochErrors = mutable.Map[TopicPartition, Errors]() // Trying to add more transactional data for the same transactional ID, producer ID, and epoch should simply replace the old data and send a retriable response. - addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1RetryWithSameEpochErrors)) + addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transaction1RetryWithSameEpochErrors)) val expectedNetworkErrors = topicPartitions.map(_ -> Errors.NETWORK_EXCEPTION).toMap assertEquals(expectedNetworkErrors, transaction1Errors) // Trying to add more transactional data for the same transactional ID and producer ID, but new epoch should replace the old data and send an error response for it. - addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1, producerEpoch = 1), setErrors(transaction1RetryWithNewerEpochErrors)) + addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, producerEpoch = 1, verifyOnly = true, topicPartitions, setErrors(transaction1RetryWithNewerEpochErrors)) val expectedEpochErrors = topicPartitions.map(_ -> Errors.INVALID_PRODUCER_EPOCH).toMap assertEquals(expectedEpochErrors, transaction1RetryWithSameEpochErrors) // Trying to add more transactional data for the same transactional ID and producer ID, but an older epoch should immediately return with error and keep the old data queued to send. - addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1, producerEpoch = 0), setErrors(transaction1RetryWithOldEpochErrors)) + addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transaction1RetryWithOldEpochErrors)) assertEquals(expectedEpochErrors, transaction1RetryWithOldEpochErrors) val requestsAndHandlers = addPartitionsToTxnManager.generateRequests().asScala requestsAndHandlers.foreach { requestAndHandler => if (requestAndHandler.destination == node0) { assertEquals(time.milliseconds(), requestAndHandler.creationTimeMs) - assertEquals(AddPartitionsToTxnRequest.Builder.forBroker( - new AddPartitionsToTxnTransactionCollection(Seq(transactionData(transactionalId3, producerId3), transactionData(transactionalId1, producerId1, producerEpoch = 1)).iterator.asJava)).data, - requestAndHandler.request.asInstanceOf[AddPartitionsToTxnRequest.Builder].data) // insertion order + assertEquals( + AddPartitionsToTxnRequest.Builder.forBroker( + new AddPartitionsToTxnTransactionCollection(Seq( + transactionData(transactionalId3, producerId3, verifyOnly = true), + transactionData(transactionalId1, producerId1, producerEpoch = 1, verifyOnly = true) + ).iterator.asJava) + ).data, + requestAndHandler.request.asInstanceOf[AddPartitionsToTxnRequest.Builder].data // insertion order + ) } else { - verifyRequest(node1, transactionalId2, producerId2, requestAndHandler) + verifyRequest(node1, transactionalId2, producerId2, requestAndHandler, verifyOnly = true) } } } @Test def testGenerateRequests(): Unit = { + when(partitionFor.apply(transactionalId1)).thenReturn(0) + when(partitionFor.apply(transactionalId2)).thenReturn(1) + when(partitionFor.apply(transactionalId3)).thenReturn(2) + when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)) + .thenReturn(Seq( + new MetadataResponseData.MetadataResponseTopic() + .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) + .setPartitions(List( + new MetadataResponseData.MetadataResponsePartition() + .setPartitionIndex(0) + .setLeaderId(0), + new MetadataResponseData.MetadataResponsePartition() + .setPartitionIndex(1) + .setLeaderId(1), + new MetadataResponseData.MetadataResponsePartition() + .setPartitionIndex(2) + .setLeaderId(2) + ).asJava) + )) + when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)) + .thenReturn(Some(node0)) + when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)) + .thenReturn(Some(node1)) + when(metadataCache.getAliveBrokerNode(2, config.interBrokerListenerName)) + .thenReturn(Some(node2)) + val transactionErrors = mutable.Map[TopicPartition, Errors]() - addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transactionErrors)) - addPartitionsToTxnManager.addTxnData(node1, transactionData(transactionalId2, producerId2), setErrors(transactionErrors)) + addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transactionErrors)) Review Comment: See my previous comment about verifyOnly = false. I suppose this is helpful for setting up for part 2 though :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org