jolshan commented on code in PR #12901:
URL: https://github.com/apache/kafka/pull/12901#discussion_r1066456951


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -1331,59 +1331,299 @@ class KafkaApisTest {
   }
 
   @Test
-  def 
shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient():
 Unit = {
+  def testHandleTxnOffsetCommitRequest(): Unit = {
+    addTopicToMetadataCache("foo", numPartitions = 1)
+
+    val txnOffsetCommitRequest = new TxnOffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setGenerationId(10)
+      .setProducerId(20)
+      .setProducerEpoch(30)
+      .setGroupInstanceId("instance-id")
+      .setTransactionalId("transactional-id")
+      .setTopics(List(
+        new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10)).asJava)).asJava)
+
+    val requestChannelRequest = buildRequest(new 
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
+
+    val future = new CompletableFuture[TxnOffsetCommitResponseData]()
+    when(newGroupCoordinator.commitTransactionalOffsets(
+      requestChannelRequest.context,
+      txnOffsetCommitRequest,
+      RequestLocal.NoCaching.bufferSupplier
+    )).thenReturn(future)
+
+    createKafkaApis().handle(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    // This is the response returned by the group coordinator.
+    val txnOffsetCommitResponse = new TxnOffsetCommitResponseData()
+      .setTopics(List(
+        new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code)).asJava)).asJava)
+
+    future.complete(txnOffsetCommitResponse)
+    val response = 
verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest)
+    assertEquals(txnOffsetCommitResponse, response.data)
+  }
+
+  @Test
+  def testHandleTxnOffsetCommitRequestFutureFailed(): Unit = {
+    addTopicToMetadataCache("foo", numPartitions = 1)
+
+    val txnOffsetCommitRequest = new TxnOffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setTopics(List(
+        new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10)).asJava)).asJava)
+
+    val requestChannelRequest = buildRequest(new 
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
+
+    val future = new CompletableFuture[TxnOffsetCommitResponseData]()
+    when(newGroupCoordinator.commitTransactionalOffsets(
+      requestChannelRequest.context,
+      txnOffsetCommitRequest,
+      RequestLocal.NoCaching.bufferSupplier
+    )).thenReturn(future)
+
+    createKafkaApis().handle(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData()
+      .setTopics(List(
+        new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NOT_COORDINATOR.code)).asJava)).asJava)
+
+    future.completeExceptionally(Errors.NOT_COORDINATOR.exception)
+    val response = 
verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest)
+    assertEquals(expectedTxnOffsetCommitResponse, response.data)
+  }
+
+  @Test
+  def testHandleTxnOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {

Review Comment:
   This test is super thorough! Thanks for adding



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to