jolshan commented on code in PR #12901: URL: https://github.com/apache/kafka/pull/12901#discussion_r1066456951
########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -1331,59 +1331,299 @@ class KafkaApisTest { } @Test - def shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient(): Unit = { + def testHandleTxnOffsetCommitRequest(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 1) + + val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setGenerationId(10) + .setProducerId(20) + .setProducerEpoch(30) + .setGroupInstanceId("instance-id") + .setTransactionalId("transactional-id") + .setTopics(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) + + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + when(newGroupCoordinator.commitTransactionalOffsets( + requestChannelRequest.context, + txnOffsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + // This is the response returned by the group coordinator. + val txnOffsetCommitResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(txnOffsetCommitResponse) + val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) + assertEquals(txnOffsetCommitResponse, response.data) + } + + @Test + def testHandleTxnOffsetCommitRequestFutureFailed(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 1) + + val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) + + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + when(newGroupCoordinator.commitTransactionalOffsets( + requestChannelRequest.context, + txnOffsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NOT_COORDINATOR.code)).asJava)).asJava) + + future.completeExceptionally(Errors.NOT_COORDINATOR.exception) + val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) + assertEquals(expectedTxnOffsetCommitResponse, response.data) + } + + @Test + def testHandleTxnOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { Review Comment: This test is super thorough! Thanks for adding -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org