This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e1f45218c96 KAFKA-19485 (II) : Complete any pending acknowledgements 
in ShareFetch on an error response. (#20247)
e1f45218c96 is described below

commit e1f45218c963ab6651320448a430f7fa3fd95033
Author: Shivsundar R <[email protected]>
AuthorDate: Thu Jul 31 17:07:44 2025 -0400

    KAFKA-19485 (II) : Complete any pending acknowledgements in ShareFetch on 
an error response. (#20247)
    
    *What*
    Currently when we received a top level error response in ShareFetch, we
    would log the error, update the share session epoch and proceed to the
    next request.
    But these acknowledgements(if any) are not completed and the callback
    would not have been processed.
    
    PR aims to address this by completing these acknowledgements with the
    error code from the response in this case.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      | 10 +++++++
 .../internals/ShareConsumeRequestManagerTest.java  | 33 ++++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index e016d0d3984..51e3fb39dfb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -758,6 +758,16 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 if (response.error() == Errors.UNKNOWN_TOPIC_ID) {
                     metadata.requestUpdate(false);
                 }
+                // Complete any inFlight acknowledgements with the error code 
from the response.
+                Map<TopicIdPartition, Acknowledgements> 
nodeAcknowledgementsInFlight = 
fetchAcknowledgementsInFlight.get(fetchTarget.id());
+                if (nodeAcknowledgementsInFlight != null) {
+                    nodeAcknowledgementsInFlight.forEach((tip, acks) -> {
+                        
acks.complete(Errors.forCode(response.error().code()).exception());
+                        
metricsManager.recordFailedAcknowledgements(acks.size());
+                    });
+                    
maybeSendShareAcknowledgeCommitCallbackEvent(nodeAcknowledgementsInFlight);
+                    nodeAcknowledgementsInFlight.clear();
+                }
                 return;
             }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 57407260dee..68da71d7767 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -1455,6 +1455,39 @@ public class ShareConsumeRequestManagerTest {
         assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
     }
 
+    @Test
+    public void 
testPiggybackAcknowledgementsOnInitialShareSession_ShareSessionNotFound() {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        assignFromSubscribed(singleton(tp0));
+        sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
+
+        fetchRecords();
+
+        // The acknowledgements for the initial fetch from tip0 are processed 
now and sent to the background thread.
+        Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+
+        // We attempt to send the acknowledgements piggybacking on the fetch.
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        // Simulate a broker restart, but no leader change, this resets share 
session epoch to 0.
+        client.prepareResponse(fetchResponseWithTopLevelError(tip0, 
Errors.SHARE_SESSION_NOT_FOUND));
+        networkClientDelegate.poll(time.timer(0));
+
+        // We would complete these acknowledgements with the error code from 
the response.
+        assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
+        assertEquals(Errors.SHARE_SESSION_NOT_FOUND.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+
+        // Next fetch would proceed as expected and would not include any 
acknowledgements.
+        NetworkClientDelegate.PollResult pollResult = 
shareConsumeRequestManager.sendFetchesReturnPollResult();
+        assertEquals(1, pollResult.unsentRequests.size());
+        ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) 
pollResult.unsentRequests.get(0).requestBuilder();
+        assertEquals(0, 
builder.data().topics().find(topicId).partitions().find(0).acknowledgementBatches().size());
+    }
+
     @Test
     public void testInvalidDefaultRecordBatch() {
         buildRequestManager();

Reply via email to