This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56a3c6dde92 KAFKA-19485: Added check before sending acknowledgements 
on initial epoch. (#20135)
56a3c6dde92 is described below

commit 56a3c6dde929763aaf74a801bd043fdd474a8ed2
Author: Shivsundar R <[email protected]>
AuthorDate: Thu Jul 10 04:06:19 2025 -0400

    KAFKA-19485: Added check before sending acknowledgements on initial epoch. 
(#20135)
    
    https://issues.apache.org/jira/browse/KAFKA-19485
    
    **Bug :**
    There is a bug in `ShareConsumeRequestManager` where we are adding
    acknowledgements on initial `ShareSession` epoch even after checking for
    it.
    Added fix to only include acknowledgements in the request if we have to,
    
    PR also adds the check at another point in the code where we could
    potentially be sending such acknowledgements.  One of the cases could be
    when metadata is refreshed with empty topic IDs after a broker restart.
    This means leader information would not be available on the node.
    
    - Consumer subscribed to a partition whose leader was node-0.
    - Broker restart happens and node-0 is elected leader again. Broker
    starts a new `ShareSession`.
    - Background thread sends a fetch request with **non-zero** epoch.
    - Broker responds with `SHARE_SESSION_NOT_FOUND`.
    - Client updates session epoch to 0 once it receives this error.
    - Client updates metadata but receives empty metadata response. (Leader
    unavailable)
    - Application thread processing the previous fetch, completes and sends
    acks to piggyback on next fetch.
    - Next fetch will send the piggyback acknowledgements on the fetch for
    previously subscribed partitions resulting in error from broker
    ("`Acknowledge data present on initial epoch`"). (Currently we attempt
    to send even if leader is unavailable).
    
    **Fix** :  Add a check before sending out acknowledgments if we are on
    an initial epoch.
    Added unit test covering the above scenario.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      | 47 +++++++++++++++++-----
 .../internals/ShareConsumeRequestManagerTest.java  | 41 +++++++++++++++++++
 2 files changed, 78 insertions(+), 10 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index b36b778546b..e8a40bd66f3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -175,22 +175,25 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
                 TopicIdPartition tip = new TopicIdPartition(topicId, 
partition);
                 Acknowledgements acknowledgementsToSend = null;
+                boolean canSendAcknowledgements = true;
+
                 Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = 
fetchAcknowledgementsToSend.get(node.id());
                 if (nodeAcksFromFetchMap != null) {
                     acknowledgementsToSend = nodeAcksFromFetchMap.remove(tip);
+
                     if (acknowledgementsToSend != null) {
-                        if (handler.isNewSession()) {
-                            // Failing the acknowledgements as we cannot have 
piggybacked acknowledgements in the initial ShareFetchRequest.
-                            
acknowledgementsToSend.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
-                            
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, 
acknowledgementsToSend));
-                        } else {
-                            
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-                            
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new 
HashMap<>()).put(tip, acknowledgementsToSend);
+                        // Check if the share session epoch is valid for 
sending acknowledgements.
+                        if (!maybeAddAcknowledgements(handler, node, tip, 
acknowledgementsToSend)) {
+                            canSendAcknowledgements = false;
                         }
                     }
                 }
 
-                handler.addPartitionToFetch(tip, acknowledgementsToSend);
+                if (canSendAcknowledgements) {
+                    handler.addPartitionToFetch(tip, acknowledgementsToSend);
+                } else {
+                    handler.addPartitionToFetch(tip, null);
+                }
                 topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), 
tip.partition()), tip.topic());
 
                 log.debug("Added fetch request for partition {} to node {}", 
tip, node.id());
@@ -212,8 +215,10 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     if (nodeAcksFromFetchMap != null) {
                         nodeAcksFromFetchMap.forEach((tip, acks) -> {
                             if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
-                                
metricsManager.recordAcknowledgementSent(acks.size());
-                                
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new 
HashMap<>()).put(tip, acks);
+                                // Check if the share session epoch is valid 
for sending acknowledgements.
+                                if (!maybeAddAcknowledgements(sessionHandler, 
node, tip, acks)) {
+                                    return;
+                                }
 
                                 
sessionHandler.addPartitionToAcknowledgeOnly(tip, acks);
                                 handlerMap.put(node, sessionHandler);
@@ -256,6 +261,28 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         return new PollResult(requests);
     }
 
+    /**
+     *
+     * @return True if we can add acknowledgements to the share session.
+     * If we cannot add acknowledgements, they are completed with {@link 
Errors#INVALID_SHARE_SESSION_EPOCH} exception.
+     */
+    private boolean maybeAddAcknowledgements(ShareSessionHandler handler,
+                                             Node node,
+                                             TopicIdPartition tip,
+                                             Acknowledgements 
acknowledgements) {
+        if (handler.isNewSession()) {
+            // Failing the acknowledgements as we cannot have piggybacked 
acknowledgements in the initial ShareFetchRequest.
+            log.debug("Cannot send acknowledgements on initial epoch for 
ShareSession for partition {}", tip);
+            
acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
+            maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, 
acknowledgements));
+            return false;
+        } else {
+            metricsManager.recordAcknowledgementSent(acknowledgements.size());
+            fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new 
HashMap<>()).put(tip, acknowledgements);
+            return true;
+        }
+    }
+
     public void fetch(Map<TopicIdPartition, NodeAcknowledgements> 
acknowledgementsMap,
                       Map<TopicIdPartition, NodeAcknowledgements> 
controlRecordAcknowledgements) {
         if (!fetchMoreRecords) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 5cdde2df6ab..0ce3a524880 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -1407,8 +1407,49 @@ public class ShareConsumeRequestManagerTest {
 
         shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
 
+        NetworkClientDelegate.PollResult pollResult = 
shareConsumeRequestManager.sendFetchesReturnPollResult();
+        assertEquals(1, pollResult.unsentRequests.size());
+        ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) 
pollResult.unsentRequests.get(0).requestBuilder();
+        assertEquals(1, builder.data().topics().size());
+        // We should not add the acknowledgements as part of the request.
+        assertEquals(0, 
builder.data().topics().find(tip0.topicId()).partitions().find(0).acknowledgementBatches().size());
+
+        assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
+        assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+    }
+
+    @Test
+    public void 
testPiggybackAcknowledgementsOnInitialShareSessionErrorSubscriptionChange() {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        assignFromSubscribed(singleton(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
+        fetchRecords();
+
+        // Simulate a broker restart, but no leader change, this resets share 
session epoch to 0.
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+        client.prepareResponse(fetchResponseWithTopLevelError(tip0, 
Errors.SHARE_SESSION_NOT_FOUND));
+        networkClientDelegate.poll(time.timer(0));
+
+        // Simulate a metadata update with no topics in the response.
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(1, 
Collections.emptyMap(),
+                        tp -> validLeaderEpoch, null, false));
+
+        // The acknowledgements for the initial fetch from tip0 are processed 
now and sent to the background thread.
+        Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+
+        assertEquals(0, completedAcknowledgements.size());
+
+        // Next fetch would not include any acknowledgements.
+        NetworkClientDelegate.PollResult pollResult = 
shareConsumeRequestManager.sendFetchesReturnPollResult();
+        assertEquals(0, pollResult.unsentRequests.size());
+
+        // We should fail any waiting acknowledgements for tip-0 as it would 
have a share session epoch equal to 0.
         assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
         assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
     }

Reply via email to