This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8637b6a0ffb KAFKA-17711: Minor cleanup changes in 
ShareConsumeRequestManager (#17392)
8637b6a0ffb is described below

commit 8637b6a0ffb04d539c7d681753e4c49d5b04e21a
Author: ShivsundarR <[email protected]>
AuthorDate: Mon Oct 7 12:33:48 2024 -0400

    KAFKA-17711: Minor cleanup changes in ShareConsumeRequestManager (#17392)
    
    What
    Minor cleanup and javadoc changes in ShareConsumeRequestManager.
    
    Reviewers:  Andrew Schofield <[email protected]>,  Manikumar Reddy 
<[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      | 63 +++++++++++++---------
 1 file changed, 39 insertions(+), 24 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index b7f77027b2c..1ca5f02a7d7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -208,6 +208,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             fetchMoreRecords = true;
         }
 
+        // The acknowledgements sent via ShareFetch are stored in this map.
         acknowledgementsMap.forEach((tip, acks) -> 
fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge));
     }
 
@@ -220,38 +221,38 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
      */
     private PollResult processAcknowledgements(long currentTimeMs) {
         List<UnsentRequest> unsentRequests = new ArrayList<>();
-        AtomicBoolean isAsyncDone = new AtomicBoolean();
+        AtomicBoolean isAsyncSent = new AtomicBoolean();
         for (Map.Entry<Integer, Tuple<AcknowledgeRequestState>> requestStates 
: acknowledgeRequestStates.entrySet()) {
             int nodeId = requestStates.getKey();
 
             if (!isNodeFree(nodeId)) {
                 log.trace("Skipping acknowledge request because previous 
request to {} has not been processed, so acks are not sent", nodeId);
             } else {
-                isAsyncDone.set(false);
-                // For commitAsync
-                maybeBuildRequest(requestStates.getValue().getAsyncRequest(), 
currentTimeMs, true, isAsyncDone).ifPresent(unsentRequests::add);
+                isAsyncSent.set(false);
+                // First, the acknowledgements from commitAsync is sent.
+                maybeBuildRequest(requestStates.getValue().getAsyncRequest(), 
currentTimeMs, true, isAsyncSent).ifPresent(unsentRequests::add);
+
                 // Check to ensure we start processing commitSync/close only 
if there are no commitAsync requests left to process.
-                if (isAsyncDone.get()) {
+                if (isAsyncSent.get()) {
+                    if (!isNodeFree(nodeId)) {
+                        log.trace("Skipping acknowledge request because 
previous request to {} has not been processed, so acks are not sent", nodeId);
+                        continue;
+                    }
+
                     // We try to process the close request only if we have 
processed the async and the sync requests for the node.
                     if (requestStates.getValue().getSyncRequestQueue() == 
null) {
-                        if (!isNodeFree(nodeId)) {
-                            log.trace("Skipping acknowledge request because 
previous request to {} has not been processed, so acks are not sent", nodeId);
-                        } else {
-                            AcknowledgeRequestState closeRequestState = 
requestStates.getValue().getCloseRequest();
+                        AcknowledgeRequestState closeRequestState = 
requestStates.getValue().getCloseRequest();
 
-                            maybeBuildRequest(closeRequestState, 
currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
-                        }
+                        maybeBuildRequest(closeRequestState, currentTimeMs, 
false, isAsyncSent).ifPresent(unsentRequests::add);
                     } else {
+                        // Processing the acknowledgements from commitSync
                         for (AcknowledgeRequestState acknowledgeRequestState : 
requestStates.getValue().getSyncRequestQueue()) {
-                            if (!isNodeFree(nodeId)) {
-                                log.trace("Skipping acknowledge request 
because previous request to {} has not been processed, so acks are not sent", 
nodeId);
-                                break;
-                            }
-                            maybeBuildRequest(acknowledgeRequestState, 
currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
+                            maybeBuildRequest(acknowledgeRequestState, 
currentTimeMs, false, isAsyncSent).ifPresent(unsentRequests::add);
                         }
                     }
                 }
             }
+
         }
 
         PollResult pollResult = null;
@@ -284,11 +285,20 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         }
     }
 
+    /**
+     *
+     * @param acknowledgeRequestState Contains the acknowledgements to be sent.
+     * @param currentTimeMs The current time in ms.
+     * @param onCommitAsync Boolean to denote if the acknowledgements came 
from a commitAsync or not.
+     * @param isAsyncSent Boolean to indicate if the async request has been 
sent.
+     *
+     * @return Returns the request if it was built.
+     */
     private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState,
                                                       long currentTimeMs,
                                                       boolean onCommitAsync,
-                                                      AtomicBoolean 
isAsyncDone) {
-        boolean asyncDone = true;
+                                                      AtomicBoolean 
isAsyncSent) {
+        boolean asyncSent = true;
         try {
             if (acknowledgeRequestState == null || 
(!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
                 return Optional.empty();
@@ -306,13 +316,13 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
             if (!acknowledgeRequestState.canSendRequest(currentTimeMs)) {
                 // We wait for the backoff before we can send this request.
-                asyncDone = false;
+                asyncSent = false;
                 return Optional.empty();
             }
 
             UnsentRequest request = acknowledgeRequestState.buildRequest();
             if (request == null) {
-                asyncDone = false;
+                asyncSent = false;
                 return Optional.empty();
             }
 
@@ -320,14 +330,15 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             return Optional.of(request);
         } finally {
             if (onCommitAsync) {
-                isAsyncDone.set(asyncDone);
+                isAsyncSent.set(asyncSent);
             }
         }
     }
 
     /**
-     * Prunes the empty acknowledgementRequestStates.
-     * Returns true if there are still any acknowledgements left to be 
processed.
+     * Prunes the empty acknowledgementRequestStates in {@link 
#acknowledgeRequestStates}
+     *
+     * @return Returns true if there are still any acknowledgements left to be 
processed.
      */
     private boolean checkAndRemoveCompletedAcknowledgements() {
         boolean areAnyAcksLeft = false;
@@ -340,10 +351,12 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 acknowledgeRequestStatePair.getValue().setAsyncRequest(null);
                 areAsyncAcksLeft = false;
             }
+
             if 
(!areRequestStatesInProgress(acknowledgeRequestStatePair.getValue().getSyncRequestQueue()))
 {
                 
acknowledgeRequestStatePair.getValue().nullifySyncRequestQueue();
                 areSyncAcksLeft = false;
             }
+
             if 
(!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getCloseRequest()))
 {
                 acknowledgeRequestStatePair.getValue().setCloseRequest(null);
             }
@@ -879,8 +892,10 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         private final AcknowledgeRequestType requestType;
 
         /**
-         * Boolean to indicate if the request has been processed,
+         * Boolean to indicate if the request has been processed.
+         * <p>
          * Set to true once we process the response and do not retry the 
request.
+         * <p>
          * Initialized to false every time we build a request.
          */
         private boolean isProcessed;

Reply via email to