apoorvmittal10 commented on code in PR #17709:
URL: https://github.com/apache/kafka/pull/17709#discussion_r1834148813
##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -247,13 +248,26 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
- val shareFetchResponseData = shareFetchResponse.data()
- assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
- assertEquals(1, shareFetchResponseData.responses().size())
- assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
- assertEquals(3,
shareFetchResponseData.responses().get(0).partitions().size())
+ // For the multi partition fetch request, the response may not be
available in the first attempt
+ // as the share partitions might not be initialized yet. So, we retry
until we get the response.
+ var responses = Seq[ShareFetchResponseData.PartitionData]()
+ TestUtils.waitUntilTrue(() => {
+ val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponseData = shareFetchResponse.data()
+ assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ val partitionsCount =
shareFetchResponseData.responses().get(0).partitions().size()
+ if (partitionsCount > 0) {
+ assertEquals(1, shareFetchResponseData.responses().size())
+ assertEquals(topicId,
shareFetchResponseData.responses().get(0).topicId())
+
shareFetchResponseData.responses().get(0).partitions().foreach(partitionData =>
{
+ if (!partitionData.acquiredRecords().isEmpty) {
+ responses = responses :+ partitionData
+ }
+ })
+ }
+ responses.size == 3
Review Comment:
We shouldn't as the problem we are trying to solve here is that when we
enable DefaultStatePersister then we do see a delay in SharePartition getting
initialized, which is supposed to happen. And with multi topic-partition share
fetch call, say tp0 and tp1, there can be scenario where tp0 is initialized and
triggers purgatory's checkAndComplete. Hence share fetch will respond with
acquired records of tp0 only.
I have added the retires here where the test case is considered successful
when all topic-partitions, tp0 and tp1 in this case, respond with acquired
records.
Prior adding topic-partitions in response array I check if the share fetch
response does have acquired records or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]