chirag-wadhwa5 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1699474736
########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4406,6 +4414,2341 @@ class KafkaApisTest extends Logging { assertEquals("broker2", node.host) } + @Test + def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val shareSessionEpoch = 0 + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, shareSessionEpoch), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId("group"). + setMemberId(memberId.toString). + setShareSessionEpoch(shareSessionEpoch). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(partitionIndex) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + } + + @Test + def testHandleShareFetchRequestInvalidRequestOnInitialEpoch() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + val partitionIndex = 0 + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + val cachedSharePartitions = new ImplicitLinkedHashCollection[CachedSharePartition] + cachedSharePartitions.mustAdd(new CachedSharePartition( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes), false)) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), new ShareSession( + new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1 + ))) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + var shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(partitionIndex) + .setPartitionMaxBytes(partitionMaxBytes) + setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + var request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + var response = verifyNoThrottling[ShareFetchResponse](request) + var responseData = response.data() + + assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode) + + // Testing whether the subsequent request with the incremented share session epoch works or not. + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + ).asJava) + ).asJava) + + shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + request = buildRequest(shareFetchRequest) + kafkaApis.handleShareFetchRequest(request) + response = verifyNoThrottling[ShareFetchResponse](request) + responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + } + + @Test + def testHandleShareFetchRequestInvalidRequestOnFinalEpoch() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ).thenThrow(Errors.INVALID_REQUEST.exception) + + when(sharePartitionManager.releaseAcquiredRecords(any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareAcknowledgeResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + ).asJava) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + var shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + var request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + var response = verifyNoThrottling[ShareFetchResponse](request) + var responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(-1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + request = buildRequest(shareFetchRequest) + kafkaApis.handleShareFetchRequest(request) + response = verifyNoThrottling[ShareFetchResponse](request) + responseData = response.data() + + assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestInvalidRequestTopicNotPresent() : Unit = { + val topicName1 = "foo1" + val topicId1 = Uuid.randomUuid() + val topicId2 = Uuid.randomUuid() + // topic 2 is not added to the metadata cache + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName1, 1, topicId = topicId1) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List( + new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId1). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + ).asJava), + new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId2). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + ).asJava) + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestInvalidRequestTopicPartitionNotPresent() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + // Adding only 1 partition to the metadata cache + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List( + new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes), + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(1) // according to metadata cache, this topic has only 1 partition, but request is trying to fetch from 2 partitions + .setPartitionMaxBytes(partitionMaxBytes) + ).asJava) + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestFetchThrowsException() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId("group"). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestAcknowledgeThrowsException() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn( + FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()) + ) + + val cachedSharePartitions = new ImplicitLinkedHashCollection[CachedSharePartition] + cachedSharePartitions.mustAdd(new CachedSharePartition( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes), false)) + + when(sharePartitionManager.newContext(any(), any(), any(), any())) + .thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), new ShareSession( + new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1)) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestFetchAndAcknowledgeThrowsException() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()) + ) + + when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn( + CompletableFuture.supplyAsync(() => { + throw Errors.UNKNOWN_SERVER_ERROR.exception() + }) + ) + + val cachedSharePartitions = new ImplicitLinkedHashCollection[CachedSharePartition] + cachedSharePartitions.mustAdd(new CachedSharePartition( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes), false)) + + when(sharePartitionManager.newContext(any(), any(), any(), any())) + .thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), new ShareSession( + new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1)) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestErrorInReadingPartition() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val records = memoryRecords(0, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List().asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId("group"). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertTrue(topicResponses.get(0).partitions.get(0).acquiredRecords.toArray().isEmpty) + assertArrayEquals(Collections.emptyList().toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + } + + @Test + def testHandleShareFetchRequestShareSessionNotFoundError() : Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val groupId = "group" + val records = memoryRecords(10, 0) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ) + .thenThrow(Errors.SHARE_SESSION_NOT_FOUND.exception) + .thenThrow(Errors.SHARE_SESSION_NOT_FOUND.exception) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + var shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + var request = buildRequest(shareFetchRequest) + // First share fetch request is to establish the share session with the broker. + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + var response = verifyNoThrottling[ShareFetchResponse](request) + var responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + + val memberId2 = Uuid.randomUuid() + + // Using wrong member ID. + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId2.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava) + + shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + request = buildRequest(shareFetchRequest) + // First share fetch request is to establish the share session with the broker. + kafkaApis.handleShareFetchRequest(request) + response = verifyNoThrottling[ShareFetchResponse](request) + responseData = response.data() + + assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, responseData.errorCode) + + // Using wrong groupId ID. Review Comment: Thanks for the review. I'm sorry but I don't understand how returning a ShareSessionContext with the wrong group/member ID will help. The entire logic to handle the groupID/memberID is in newContext and if a wrong groupID/memberID is provided in the request, an newContext throws an error, which is what this test tries to mimic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org