ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -644,8 +645,12 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition,
NodeAcknowledgements> ack
if (currentFetch.isEmpty()) {
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
if (fetch.isEmpty()) {
+ // Check for any acknowledgements which could have come from
control records (GAP) and include them.
+ Map<TopicIdPartition, NodeAcknowledgements>
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
// Fetch more records and send any waiting acknowledgements
- applicationEventHandler.add(new
ShareFetchEvent(acknowledgementsMap));
+ applicationEventHandler.add(new
ShareFetchEvent(combinedAcknowledgements));
Review Comment:
Yes :)) turns out it can.
- Some integ tests in this PR - https://github.com/apache/kafka/pull/19261
revealed that in transactions, when client receives only a control record(eg.
an abort marker) in the `ShareFetchResponse` (without any non-control record),
then in the `ShareCompletedFetch`, these control records are never
acknowledged(ideally acknowledged with GAP, indicating the client is ignoring
these control records) and are never presented to the consumer application.
- It is expected that control records are skipped and are not presented to
the application, but client should still acknowledge them with GAP
(https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33)
- Now these control records are usually auto acknowledged with `GAP` and
will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty,
we actually ignore the fetch here(meaning we never acknowledge these control
records) -
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
- Now for this PR, we have added any possible acknowledgements that came in
with the empty fetch (from control records) to the `ShareFetchEvent` so that it
can be sent on the next poll().
- I agree it looks a bit odd though for readability. But yeah there is a
case when this could happen.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]