junrao commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1698763204
########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4406,6 +4414,2341 @@ class KafkaApisTest extends Logging { assertEquals("broker2", node.host) } + @Test + def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val shareSessionEpoch = 0 + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, shareSessionEpoch), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId("group"). + setMemberId(memberId.toString). + setShareSessionEpoch(shareSessionEpoch). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(partitionIndex) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + } + + @Test + def testHandleShareFetchRequestInvalidRequestOnInitialEpoch() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + val partitionIndex = 0 + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + val cachedSharePartitions = new ImplicitLinkedHashCollection[CachedSharePartition] + cachedSharePartitions.mustAdd(new CachedSharePartition( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes), false)) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), new ShareSession( + new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1 + ))) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + var shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(partitionIndex) + .setPartitionMaxBytes(partitionMaxBytes) + setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + var request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + var response = verifyNoThrottling[ShareFetchResponse](request) + var responseData = response.data() + + assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode) + + // Testing whether the subsequent request with the incremented share session epoch works or not. + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + ).asJava) + ).asJava) + + shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + request = buildRequest(shareFetchRequest) + kafkaApis.handleShareFetchRequest(request) + response = verifyNoThrottling[ShareFetchResponse](request) + responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + } + + @Test + def testHandleShareFetchRequestInvalidRequestOnFinalEpoch() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ).thenThrow(Errors.INVALID_REQUEST.exception) + + when(sharePartitionManager.releaseAcquiredRecords(any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareAcknowledgeResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + ).asJava) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + var shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + var request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + var response = verifyNoThrottling[ShareFetchResponse](request) + var responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(-1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) Review Comment: Thanks for the explanation. Make sense. Could we add a comment about this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org