This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e1f45218c96 KAFKA-19485 (II) : Complete any pending acknowledgements
in ShareFetch on an error response. (#20247)
e1f45218c96 is described below
commit e1f45218c963ab6651320448a430f7fa3fd95033
Author: Shivsundar R <[email protected]>
AuthorDate: Thu Jul 31 17:07:44 2025 -0400
KAFKA-19485 (II) : Complete any pending acknowledgements in ShareFetch on
an error response. (#20247)
*What*
Currently when we received a top level error response in ShareFetch, we
would log the error, update the share session epoch and proceed to the
next request.
But these acknowledgements(if any) are not completed and the callback
would not have been processed.
PR aims to address this by completing these acknowledgements with the
error code from the response in this case.
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 10 +++++++
.../internals/ShareConsumeRequestManagerTest.java | 33 ++++++++++++++++++++++
2 files changed, 43 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index e016d0d3984..51e3fb39dfb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -758,6 +758,16 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (response.error() == Errors.UNKNOWN_TOPIC_ID) {
metadata.requestUpdate(false);
}
+ // Complete any inFlight acknowledgements with the error code
from the response.
+ Map<TopicIdPartition, Acknowledgements>
nodeAcknowledgementsInFlight =
fetchAcknowledgementsInFlight.get(fetchTarget.id());
+ if (nodeAcknowledgementsInFlight != null) {
+ nodeAcknowledgementsInFlight.forEach((tip, acks) -> {
+
acks.complete(Errors.forCode(response.error().code()).exception());
+
metricsManager.recordFailedAcknowledgements(acks.size());
+ });
+
maybeSendShareAcknowledgeCommitCallbackEvent(nodeAcknowledgementsInFlight);
+ nodeAcknowledgementsInFlight.clear();
+ }
return;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 57407260dee..68da71d7767 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -1455,6 +1455,39 @@ public class ShareConsumeRequestManagerTest {
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
}
+ @Test
+ public void
testPiggybackAcknowledgementsOnInitialShareSession_ShareSessionNotFound() {
+ buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ assignFromSubscribed(singleton(tp0));
+ sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
+
+ fetchRecords();
+
+ // The acknowledgements for the initial fetch from tip0 are processed
now and sent to the background thread.
+ Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+
+ // We attempt to send the acknowledgements piggybacking on the fetch.
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ // Simulate a broker restart, but no leader change, this resets share
session epoch to 0.
+ client.prepareResponse(fetchResponseWithTopLevelError(tip0,
Errors.SHARE_SESSION_NOT_FOUND));
+ networkClientDelegate.poll(time.timer(0));
+
+ // We would complete these acknowledgements with the error code from
the response.
+ assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
+ assertEquals(Errors.SHARE_SESSION_NOT_FOUND.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+
+ // Next fetch would proceed as expected and would not include any
acknowledgements.
+ NetworkClientDelegate.PollResult pollResult =
shareConsumeRequestManager.sendFetchesReturnPollResult();
+ assertEquals(1, pollResult.unsentRequests.size());
+ ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder)
pollResult.unsentRequests.get(0).requestBuilder();
+ assertEquals(0,
builder.data().topics().find(topicId).partitions().find(0).acknowledgementBatches().size());
+ }
+
@Test
public void testInvalidDefaultRecordBatch() {
buildRequestManager();