chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1699487912


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4406,6 +4414,2341 @@ class KafkaApisTest extends Logging {
     assertEquals("broker2", node.host)
   }
 
+  @Test
+  def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val shareSessionEpoch = 0
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+                  new ShareFetchResponseData.PartitionData()
+                    .setErrorCode(Errors.NONE.code)
+                    .setAcknowledgeErrorCode(Errors.NONE.code)
+                    .setRecords(records)
+                    .setAcquiredRecords(new util.ArrayList(List(
+                      new ShareFetchResponseData.AcquiredRecords()
+                        .setFirstOffset(0)
+                        .setLastOffset(9)
+                        .setDeliveryCount(1)
+                      ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 
shareSessionEpoch), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId("group").
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(shareSessionEpoch).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(partitionIndex)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestOnInitialEpoch() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+    val partitionIndex = 0
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    val cachedSharePartitions = new 
ImplicitLinkedHashCollection[CachedSharePartition]
+    cachedSharePartitions.mustAdd(new CachedSharePartition(
+      new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, 
partitionMaxBytes), false))
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), 
new ShareSession(
+      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1
+    )))
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(partitionIndex)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            setAcknowledgementBatches(List(
+            new AcknowledgementBatch()
+              .setFirstOffset(0)
+              .setLastOffset(9)
+              .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+          ).asJava)
+        ).asJava)
+      ).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
+
+    // Testing whether the subsequent request with the incremented share 
session epoch works or not.
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+        ).asJava)
+      ).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestOnFinalEpoch() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    ).thenThrow(Errors.INVALID_REQUEST.exception)
+
+    when(sharePartitionManager.releaseAcquiredRecords(any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareAcknowledgeResponseData.PartitionData()
+            .setPartitionIndex(partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(-1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestTopicNotPresent() : Unit = {
+    val topicName1 = "foo1"
+    val topicId1 = Uuid.randomUuid()
+    val topicId2 = Uuid.randomUuid()
+    // topic 2 is not added to the metadata cache
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(
+        new ShareFetchRequestData.FetchTopic().
+          setTopicId(topicId1).
+          setPartitions(List(
+            new ShareFetchRequestData.FetchPartition()
+              .setPartitionIndex(0)
+              .setPartitionMaxBytes(partitionMaxBytes)
+          ).asJava),
+        new ShareFetchRequestData.FetchTopic().
+          setTopicId(topicId2).
+          setPartitions(List(
+            new ShareFetchRequestData.FetchPartition()
+              .setPartitionIndex(0)
+              .setPartitionMaxBytes(partitionMaxBytes)
+          ).asJava)
+      ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestTopicPartitionNotPresent() : 
Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    // Adding only 1 partition to the metadata cache
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(
+        new ShareFetchRequestData.FetchTopic().
+          setTopicId(topicId).
+          setPartitions(List(
+            new ShareFetchRequestData.FetchPartition()
+              .setPartitionIndex(0)
+              .setPartitionMaxBytes(partitionMaxBytes),
+            new ShareFetchRequestData.FetchPartition()
+              .setPartitionIndex(1) // according to metadata cache, this topic 
has only 1 partition, but request is trying to fetch from 2 partitions
+              .setPartitionMaxBytes(partitionMaxBytes)
+          ).asJava)
+      ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestFetchThrowsException() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId("group").
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestAcknowledgeThrowsException() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
+      FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())
+    )
+
+    val cachedSharePartitions = new 
ImplicitLinkedHashCollection[CachedSharePartition]
+    cachedSharePartitions.mustAdd(new CachedSharePartition(
+      new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, 
partitionMaxBytes), false))
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any()))
+      .thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), 
new ShareSession(
+      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 
1))
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestFetchAndAcknowledgeThrowsException() : Unit = 
{
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())
+    )
+
+    when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
+      CompletableFuture.supplyAsync(() => {
+        throw Errors.UNKNOWN_SERVER_ERROR.exception()
+      })
+    )
+
+    val cachedSharePartitions = new 
ImplicitLinkedHashCollection[CachedSharePartition]
+    cachedSharePartitions.mustAdd(new CachedSharePartition(
+      new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, 
partitionMaxBytes), false))
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any()))
+      .thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), 
new ShareSession(
+        new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 
1))
+      )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestErrorInReadingPartition() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val records = memoryRecords(0, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List().asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId("group").
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    
assertTrue(topicResponses.get(0).partitions.get(0).acquiredRecords.toArray().isEmpty)
+    assertArrayEquals(Collections.emptyList().toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchRequestShareSessionNotFoundError() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    )
+      .thenThrow(Errors.SHARE_SESSION_NOT_FOUND.exception)
+      .thenThrow(Errors.SHARE_SESSION_NOT_FOUND.exception)
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    // First share fetch request is to establish the share session with the 
broker.
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+
+    val memberId2 = Uuid.randomUuid()
+
+    // Using wrong member ID.
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId2.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    // First share fetch request is to establish the share session with the 
broker.
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+
+    assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, responseData.errorCode)
+
+    // Using wrong groupId ID.
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    // First share fetch request is to establish the share session with the 
broker.
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+
+    assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidShareSessionError() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+        new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+          new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+            new ShareFetchRequest.SharePartitionData(topicId, 
partitionMaxBytes)
+        ).asJava)
+      )
+      .thenThrow(Errors.INVALID_SHARE_SESSION_EPOCH.exception)
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    // First share fetch request is to establish the share session with the 
broker.
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(2). // Invalid share session epoch, should have 1 
for the second request.
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    // First share fetch request is to establish the share session with the 
broker.
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+
+    assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestShareSessionSuccessfullyEstablished() : Unit 
= {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.randomUuid()
+
+    val groupId = "group"
+
+    val records1 = memoryRecords(10, 0)
+    val records2 = memoryRecords(10, 10)
+    val records3 = memoryRecords(10, 20)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setRecords(records1)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    ).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records2)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(10)
+                .setLastOffset(19)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    ).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records3)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(20)
+                .setLastOffset(29)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareAcknowledgeResponseData.PartitionData()
+            .setPartitionIndex(partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+      ).asJava)
+    ).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareAcknowledgeResponseData.PartitionData()
+            .setPartitionIndex(partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+      ).asJava)
+    )
+
+    val cachedSharePartitions = new 
ImplicitLinkedHashCollection[CachedSharePartition]
+    cachedSharePartitions.mustAdd(new CachedSharePartition(
+      new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, 
partitionMaxBytes), false))
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), 
new ShareSession(
+      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 
1))
+    ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), 
new ShareSession(
+      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 10L, 
1))

Review Comment:
   Thanks for the review. Yes, you are right. Actually, the ShareFetchMetadata 
would include the current epoch in the request, but the ShareSession would 
contain the next epoch, as the epoch is bumped in the newContext method. I have 
made the change here, as well as in the other functions. Thanks again for 
pointing it out !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to