ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2022176927
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -2905,4 +2729,48 @@ public void add(BackgroundEvent event) {
}
}
}
+
+ @Test
+ void testFetchWithControlRecords() {
+ buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsMap =
new HashMap<>();
+
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+ nodeAcknowledgementsMap.put(tip0, new NodeAcknowledgements(0,
acknowledgements));
+
+ Map<TopicIdPartition, NodeAcknowledgements>
nodeAcknowledgementsControlRecordMap = new HashMap<>();
+
+ Acknowledgements controlAcknowledgements = Acknowledgements.empty();
+ controlAcknowledgements.addGap(2L);
+ nodeAcknowledgementsControlRecordMap.put(tip0, new
NodeAcknowledgements(0, controlAcknowledgements));
+
+ shareConsumeRequestManager.fetch(nodeAcknowledgementsMap,
nodeAcknowledgementsControlRecordMap);
+
+ Map<TopicIdPartition, Acknowledgements> fetchAcksToSend =
shareConsumeRequestManager.getFetchAcknowledgementsToSend(0);
+ System.out.println(fetchAcksToSend);
+ assertEquals(1, fetchAcksToSend.size());
+ assertEquals(AcknowledgeType.ACCEPT,
fetchAcksToSend.get(tip0).get(1L));
+ assertEquals(2, fetchAcksToSend.get(tip0).size());
+ assertNull(fetchAcksToSend.get(tip0).get(3L));
+ }
+
+ void sendFetchAndVerifyResponse(MemoryRecords records,
+
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords,
+ Errors... error) {
Review Comment:
There was one test which needed this, the change was missed when I pushed
the commit. I have updated the PR with one usage of this varargs part. In
future if we add tests with both top level error and acknowledge error, this
might be useful as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]