ShivsundarR commented on code in PR #17537:
URL: https://github.com/apache/kafka/pull/17537#discussion_r1818854329


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -172,14 +175,53 @@ public PollResult poll(long currentTimeMs) {
                     
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
                 }
                 handler.addPartitionToFetch(tip, acknowledgementsToSend);
+                fetchedPartitions.add(tip);
 
-                log.debug("Added fetch request for partition {} to node {}", 
partition, node.id());
+                log.debug("Added fetch request for partition {} to node {}", 
tip, node.id());
             }
         }
 
+        // Map storing the list of partitions to forget in the upcoming 
request.
+        Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new 
HashMap<>();
+        Cluster cluster = metadata.fetch();
+        // Iterating over the session handlers to see if there are 
acknowledgements to be sent for partitions
+        // which are no longer part of the current subscription.
+        sessionHandlers.forEach((nodeId, sessionHandler) -> {
+            Node node = cluster.nodeById(nodeId);
+            if (node != null) {
+                if (nodesWithPendingRequests.contains(node.id())) {
+                    log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", node.id());
+                } else {
+                    for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
+                        if (!fetchedPartitions.contains(tip)) {
+                            Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsMap.get(tip);
+                            if (acknowledgementsToSend != null) {
+                                
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+                            }
+                            sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
+                            partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
+                            partitionsToForgetMap.get(node).add(tip);
+
+                            forgottenTopicNames.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
+                            fetchedPartitions.add(tip);
+                            log.debug("Added fetch request for partition {} to 
node {}", tip, node.id());
+                        }
+                    }
+                }
+            }
+        });
+
         Map<Node, ShareFetchRequest.Builder> builderMap = new 
LinkedHashMap<>();
         for (Map.Entry<Node, ShareSessionHandler> entry : 
handlerMap.entrySet()) {
             builderMap.put(entry.getKey(), 
entry.getValue().newShareFetchBuilder(groupId, fetchConfig));
+            Node node = entry.getKey();
+            ShareFetchRequest.Builder builder = builderMap.get(entry.getKey());

Review Comment:
   Right yeah makes sense, thanks. I have updated the PR now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to