junrao commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1698763204


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4406,6 +4414,2341 @@ class KafkaApisTest extends Logging {
     assertEquals("broker2", node.host)
   }
 
+  @Test
+  def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val shareSessionEpoch = 0
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+                  new ShareFetchResponseData.PartitionData()
+                    .setErrorCode(Errors.NONE.code)
+                    .setAcknowledgeErrorCode(Errors.NONE.code)
+                    .setRecords(records)
+                    .setAcquiredRecords(new util.ArrayList(List(
+                      new ShareFetchResponseData.AcquiredRecords()
+                        .setFirstOffset(0)
+                        .setLastOffset(9)
+                        .setDeliveryCount(1)
+                      ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 
shareSessionEpoch), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId("group").
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(shareSessionEpoch).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(partitionIndex)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestOnInitialEpoch() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+    val partitionIndex = 0
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    val cachedSharePartitions = new 
ImplicitLinkedHashCollection[CachedSharePartition]
+    cachedSharePartitions.mustAdd(new CachedSharePartition(
+      new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, 
partitionMaxBytes), false))
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), 
new ShareSession(
+      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1
+    )))
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(partitionIndex)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            setAcknowledgementBatches(List(
+            new AcknowledgementBatch()
+              .setFirstOffset(0)
+              .setLastOffset(9)
+              .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+          ).asJava)
+        ).asJava)
+      ).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
+
+    // Testing whether the subsequent request with the incremented share 
session epoch works or not.
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+        ).asJava)
+      ).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestOnFinalEpoch() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    ).thenThrow(Errors.INVALID_REQUEST.exception)
+
+    when(sharePartitionManager.releaseAcquiredRecords(any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareAcknowledgeResponseData.PartitionData()
+            .setPartitionIndex(partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(-1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)

Review Comment:
   Thanks for the explanation. Make sense. Could we add a comment about this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to