apoorvmittal10 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1828527303


##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -138,17 +138,23 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    var acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
+    var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap, 10)
+    var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
-    val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
-    val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
-    val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    // Send the second share fetch request to fetch the records produced above
+    metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+    acknowledgementsMap = Map.empty
+    shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
+    shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)

Review Comment:
   Cant we define the offset reset startegy while creating KafkaShareConsumer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to