chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1699474736


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4406,6 +4414,2341 @@ class KafkaApisTest extends Logging {
     assertEquals("broker2", node.host)
   }
 
+  @Test
+  def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val shareSessionEpoch = 0
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+                  new ShareFetchResponseData.PartitionData()
+                    .setErrorCode(Errors.NONE.code)
+                    .setAcknowledgeErrorCode(Errors.NONE.code)
+                    .setRecords(records)
+                    .setAcquiredRecords(new util.ArrayList(List(
+                      new ShareFetchResponseData.AcquiredRecords()
+                        .setFirstOffset(0)
+                        .setLastOffset(9)
+                        .setDeliveryCount(1)
+                      ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 
shareSessionEpoch), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId("group").
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(shareSessionEpoch).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(partitionIndex)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestOnInitialEpoch() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+    val partitionIndex = 0
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    val cachedSharePartitions = new 
ImplicitLinkedHashCollection[CachedSharePartition]
+    cachedSharePartitions.mustAdd(new CachedSharePartition(
+      new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, 
partitionMaxBytes), false))
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), 
new ShareSession(
+      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1
+    )))
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(partitionIndex)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            setAcknowledgementBatches(List(
+            new AcknowledgementBatch()
+              .setFirstOffset(0)
+              .setLastOffset(9)
+              .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+          ).asJava)
+        ).asJava)
+      ).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
+
+    // Testing whether the subsequent request with the incremented share 
session epoch works or not.
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+        ).asJava)
+      ).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestOnFinalEpoch() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    ).thenThrow(Errors.INVALID_REQUEST.exception)
+
+    when(sharePartitionManager.releaseAcquiredRecords(any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareAcknowledgeResponseData.PartitionData()
+            .setPartitionIndex(partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(-1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestTopicNotPresent() : Unit = {
+    val topicName1 = "foo1"
+    val topicId1 = Uuid.randomUuid()
+    val topicId2 = Uuid.randomUuid()
+    // topic 2 is not added to the metadata cache
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(
+        new ShareFetchRequestData.FetchTopic().
+          setTopicId(topicId1).
+          setPartitions(List(
+            new ShareFetchRequestData.FetchPartition()
+              .setPartitionIndex(0)
+              .setPartitionMaxBytes(partitionMaxBytes)
+          ).asJava),
+        new ShareFetchRequestData.FetchTopic().
+          setTopicId(topicId2).
+          setPartitions(List(
+            new ShareFetchRequestData.FetchPartition()
+              .setPartitionIndex(0)
+              .setPartitionMaxBytes(partitionMaxBytes)
+          ).asJava)
+      ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestTopicPartitionNotPresent() : 
Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    // Adding only 1 partition to the metadata cache
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(
+        new ShareFetchRequestData.FetchTopic().
+          setTopicId(topicId).
+          setPartitions(List(
+            new ShareFetchRequestData.FetchPartition()
+              .setPartitionIndex(0)
+              .setPartitionMaxBytes(partitionMaxBytes),
+            new ShareFetchRequestData.FetchPartition()
+              .setPartitionIndex(1) // according to metadata cache, this topic 
has only 1 partition, but request is trying to fetch from 2 partitions
+              .setPartitionMaxBytes(partitionMaxBytes)
+          ).asJava)
+      ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestFetchThrowsException() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId("group").
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestAcknowledgeThrowsException() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
+      FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())
+    )
+
+    val cachedSharePartitions = new 
ImplicitLinkedHashCollection[CachedSharePartition]
+    cachedSharePartitions.mustAdd(new CachedSharePartition(
+      new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, 
partitionMaxBytes), false))
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any()))
+      .thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), 
new ShareSession(
+      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 
1))
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestFetchAndAcknowledgeThrowsException() : Unit = 
{
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())
+    )
+
+    when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
+      CompletableFuture.supplyAsync(() => {
+        throw Errors.UNKNOWN_SERVER_ERROR.exception()
+      })
+    )
+
+    val cachedSharePartitions = new 
ImplicitLinkedHashCollection[CachedSharePartition]
+    cachedSharePartitions.mustAdd(new CachedSharePartition(
+      new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, 
partitionMaxBytes), false))
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any()))
+      .thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), 
new ShareSession(
+        new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 
1))
+      )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestErrorInReadingPartition() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val records = memoryRecords(0, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List().asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId("group").
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    
assertTrue(topicResponses.get(0).partitions.get(0).acquiredRecords.toArray().isEmpty)
+    assertArrayEquals(Collections.emptyList().toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchRequestShareSessionNotFoundError() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    )
+      .thenThrow(Errors.SHARE_SESSION_NOT_FOUND.exception)
+      .thenThrow(Errors.SHARE_SESSION_NOT_FOUND.exception)
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    // First share fetch request is to establish the share session with the 
broker.
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+
+    val memberId2 = Uuid.randomUuid()
+
+    // Using wrong member ID.
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId2.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    // First share fetch request is to establish the share session with the 
broker.
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+
+    assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, responseData.errorCode)
+
+    // Using wrong groupId ID.

Review Comment:
   Thanks for the review. I'm sorry but I don't understand how returning a 
ShareSessionContext with the wrong group/member ID will help. The entire logic 
to handle the groupID/memberID is in newContext and if a wrong groupID/memberID 
is provided in the request, an newContext throws an error, which is what this 
test tries to mimic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to