chirag-wadhwa5 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1699487912
########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4406,6 +4414,2341 @@ class KafkaApisTest extends Logging { assertEquals("broker2", node.host) } + @Test + def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val shareSessionEpoch = 0 + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, shareSessionEpoch), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId("group"). + setMemberId(memberId.toString). + setShareSessionEpoch(shareSessionEpoch). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(partitionIndex) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + } + + @Test + def testHandleShareFetchRequestInvalidRequestOnInitialEpoch() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + val partitionIndex = 0 + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + val cachedSharePartitions = new ImplicitLinkedHashCollection[CachedSharePartition] + cachedSharePartitions.mustAdd(new CachedSharePartition( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes), false)) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), new ShareSession( + new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1 + ))) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + var shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(partitionIndex) + .setPartitionMaxBytes(partitionMaxBytes) + setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + var request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + var response = verifyNoThrottling[ShareFetchResponse](request) + var responseData = response.data() + + assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode) + + // Testing whether the subsequent request with the incremented share session epoch works or not. + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + ).asJava) + ).asJava) + + shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + request = buildRequest(shareFetchRequest) + kafkaApis.handleShareFetchRequest(request) + response = verifyNoThrottling[ShareFetchResponse](request) + responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + } + + @Test + def testHandleShareFetchRequestInvalidRequestOnFinalEpoch() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ).thenThrow(Errors.INVALID_REQUEST.exception) + + when(sharePartitionManager.releaseAcquiredRecords(any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareAcknowledgeResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + ).asJava) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + var shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + var request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + var response = verifyNoThrottling[ShareFetchResponse](request) + var responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(-1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + request = buildRequest(shareFetchRequest) + kafkaApis.handleShareFetchRequest(request) + response = verifyNoThrottling[ShareFetchResponse](request) + responseData = response.data() + + assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestInvalidRequestTopicNotPresent() : Unit = { + val topicName1 = "foo1" + val topicId1 = Uuid.randomUuid() + val topicId2 = Uuid.randomUuid() + // topic 2 is not added to the metadata cache + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName1, 1, topicId = topicId1) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List( + new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId1). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + ).asJava), + new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId2). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + ).asJava) + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestInvalidRequestTopicPartitionNotPresent() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + // Adding only 1 partition to the metadata cache + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List( + new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes), + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(1) // according to metadata cache, this topic has only 1 partition, but request is trying to fetch from 2 partitions + .setPartitionMaxBytes(partitionMaxBytes) + ).asJava) + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestFetchThrowsException() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId("group"). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestAcknowledgeThrowsException() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn( + FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()) + ) + + val cachedSharePartitions = new ImplicitLinkedHashCollection[CachedSharePartition] + cachedSharePartitions.mustAdd(new CachedSharePartition( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes), false)) + + when(sharePartitionManager.newContext(any(), any(), any(), any())) + .thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), new ShareSession( + new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1)) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestFetchAndAcknowledgeThrowsException() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()) + ) + + when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn( + CompletableFuture.supplyAsync(() => { + throw Errors.UNKNOWN_SERVER_ERROR.exception() + }) + ) + + val cachedSharePartitions = new ImplicitLinkedHashCollection[CachedSharePartition] + cachedSharePartitions.mustAdd(new CachedSharePartition( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes), false)) + + when(sharePartitionManager.newContext(any(), any(), any(), any())) + .thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), new ShareSession( + new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1)) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestErrorInReadingPartition() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val records = memoryRecords(0, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List().asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId("group"). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertTrue(topicResponses.get(0).partitions.get(0).acquiredRecords.toArray().isEmpty) + assertArrayEquals(Collections.emptyList().toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + } + + @Test + def testHandleShareFetchRequestShareSessionNotFoundError() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ) + .thenThrow(Errors.SHARE_SESSION_NOT_FOUND.exception) + .thenThrow(Errors.SHARE_SESSION_NOT_FOUND.exception) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + var shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + var request = buildRequest(shareFetchRequest) + // First share fetch request is to establish the share session with the broker. + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + var response = verifyNoThrottling[ShareFetchResponse](request) + var responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + + val memberId2 = Uuid.randomUuid() + + // Using wrong member ID. + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId2.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + request = buildRequest(shareFetchRequest) + // First share fetch request is to establish the share session with the broker. + kafkaApis.handleShareFetchRequest(request) + response = verifyNoThrottling[ShareFetchResponse](request) + responseData = response.data() + + assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, responseData.errorCode) + + // Using wrong groupId ID. + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + request = buildRequest(shareFetchRequest) + // First share fetch request is to establish the share session with the broker. + kafkaApis.handleShareFetchRequest(request) + response = verifyNoThrottling[ShareFetchResponse](request) + responseData = response.data() + + assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestInvalidShareSessionError() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId: Uuid = Uuid.ZERO_UUID + + val groupId = "group" + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ) + .thenThrow(Errors.INVALID_SHARE_SESSION_EPOCH.exception) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + var shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + var request = buildRequest(shareFetchRequest) + // First share fetch request is to establish the share session with the broker. + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + var response = verifyNoThrottling[ShareFetchResponse](request) + var responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(2). // Invalid share session epoch, should have 1 for the second request. + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + request = buildRequest(shareFetchRequest) + // First share fetch request is to establish the share session with the broker. + kafkaApis.handleShareFetchRequest(request) + response = verifyNoThrottling[ShareFetchResponse](request) + responseData = response.data() + + assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestShareSessionSuccessfullyEstablished() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId: Uuid = Uuid.randomUuid() + + val groupId = "group" + + val records1 = memoryRecords(10, 0) + val records2 = memoryRecords(10, 10) + val records3 = memoryRecords(10, 20) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setRecords(records1) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records2) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(10) + .setLastOffset(19) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records3) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(20) + .setLastOffset(29) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareAcknowledgeResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + ).asJava) + ).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareAcknowledgeResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + ).asJava) + ) + + val cachedSharePartitions = new ImplicitLinkedHashCollection[CachedSharePartition] + cachedSharePartitions.mustAdd(new CachedSharePartition( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes), false)) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), new ShareSession( + new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1)) + ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), new ShareSession( + new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 10L, 1)) Review Comment: Thanks for the review. Yes, you are right. Actually, the ShareFetchMetadata would include the current epoch in the request, but the ShareSession would contain the next epoch, as the epoch is bumped in the newContext method. I have made the change here, as well as in the other functions. Thanks again for pointing it out ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org