jolshan commented on code in PR #14402:
URL: https://github.com/apache/kafka/pull/14402#discussion_r1330524344


##########
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##########
@@ -157,29 +216,114 @@ class AddPartitionsToTxnManagerTest {
     // The request for node1 should not be added because one request is 
already inflight.
     assertEquals(1, requestsAndHandlers2.size)
     requestsAndHandlers2.foreach { requestAndHandler =>
-      verifyRequest(node2, transactionalId3, producerId3, requestAndHandler)
+      verifyRequest(node2, transactionalId3, producerId3, requestAndHandler, 
verifyOnly = false)
     }
 
     // Complete the request for node1 so the new one can go through.
     requestsAndHandlers.filter(_.destination == 
node1).head.handler.onComplete(authenticationErrorResponse)
     val requestsAndHandlers3 = 
addPartitionsToTxnManager.generateRequests().asScala
     assertEquals(1, requestsAndHandlers3.size)
     requestsAndHandlers3.foreach { requestAndHandler =>
-      verifyRequest(node1, transactionalId2, producerId2, requestAndHandler)
+      verifyRequest(node1, transactionalId2, producerId2, requestAndHandler, 
verifyOnly = true)
+    }
+  }
+
+  @Test
+  def testTransactionCoordinatorResolution(): Unit = {
+    when(partitionFor.apply(transactionalId1)).thenReturn(0)
+
+    def checkError(): Unit = {
+      val errors = mutable.Map[TopicPartition, Errors]()
+
+      addPartitionsToTxnManager.addTxnData(
+        transactionalId1,
+        producerId1,
+        producerEpoch = 0,
+        verifyOnly = true,
+        topicPartitions,
+        setErrors(errors)
+      )
+
+      assertEquals(topicPartitions.map(tp => tp -> 
Errors.COORDINATOR_NOT_AVAILABLE).toMap, errors)
     }
+
+    // The transaction state topic does not exist.
+    
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName))
+      .thenReturn(Seq())
+    checkError()
+
+    // The metadata of the transaction state topic returns an error.
+    
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName))
+      .thenReturn(Seq(
+        new MetadataResponseData.MetadataResponseTopic()
+          .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
+          .setErrorCode(Errors.BROKER_NOT_AVAILABLE.code)
+      ))
+    checkError()
+
+    // The partition does not exist.
+    
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName))
+      .thenReturn(Seq(
+        new MetadataResponseData.MetadataResponseTopic()
+          .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
+      ))
+    checkError()
+
+    // The partition has not leader.

Review Comment:
   nit: The partition has no leader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to