AndrewJSchofield commented on code in PR #17537:
URL: https://github.com/apache/kafka/pull/17537#discussion_r1818812830


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -172,14 +175,53 @@ public PollResult poll(long currentTimeMs) {
                     
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
                 }
                 handler.addPartitionToFetch(tip, acknowledgementsToSend);
+                fetchedPartitions.add(tip);
 
-                log.debug("Added fetch request for partition {} to node {}", 
partition, node.id());
+                log.debug("Added fetch request for partition {} to node {}", 
tip, node.id());
             }
         }
 
+        // Map storing the list of partitions to forget in the upcoming 
request.
+        Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new 
HashMap<>();
+        Cluster cluster = metadata.fetch();
+        // Iterating over the session handlers to see if there are 
acknowledgements to be sent for partitions
+        // which are no longer part of the current subscription.
+        sessionHandlers.forEach((nodeId, sessionHandler) -> {
+            Node node = cluster.nodeById(nodeId);
+            if (node != null) {
+                if (nodesWithPendingRequests.contains(node.id())) {
+                    log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", node.id());
+                } else {
+                    for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
+                        if (!fetchedPartitions.contains(tip)) {
+                            Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsMap.get(tip);
+                            if (acknowledgementsToSend != null) {
+                                
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+                            }
+                            sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
+                            partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
+                            partitionsToForgetMap.get(node).add(tip);
+
+                            forgottenTopicNames.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
+                            fetchedPartitions.add(tip);
+                            log.debug("Added fetch request for partition {} to 
node {}", tip, node.id());

Review Comment:
   This could say "for previously subscribed partition" in the log message to 
make it clear that this is essentially finishing off the processing for a 
partition which is no longer subscribed. It's not really a proper fetch now.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -172,14 +175,53 @@ public PollResult poll(long currentTimeMs) {
                     
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
                 }
                 handler.addPartitionToFetch(tip, acknowledgementsToSend);
+                fetchedPartitions.add(tip);
 
-                log.debug("Added fetch request for partition {} to node {}", 
partition, node.id());
+                log.debug("Added fetch request for partition {} to node {}", 
tip, node.id());
             }
         }
 
+        // Map storing the list of partitions to forget in the upcoming 
request.
+        Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new 
HashMap<>();
+        Cluster cluster = metadata.fetch();
+        // Iterating over the session handlers to see if there are 
acknowledgements to be sent for partitions
+        // which are no longer part of the current subscription.
+        sessionHandlers.forEach((nodeId, sessionHandler) -> {
+            Node node = cluster.nodeById(nodeId);
+            if (node != null) {
+                if (nodesWithPendingRequests.contains(node.id())) {
+                    log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", node.id());
+                } else {
+                    for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
+                        if (!fetchedPartitions.contains(tip)) {
+                            Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsMap.get(tip);
+                            if (acknowledgementsToSend != null) {
+                                
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+                            }
+                            sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
+                            partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
+                            partitionsToForgetMap.get(node).add(tip);
+
+                            forgottenTopicNames.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
+                            fetchedPartitions.add(tip);
+                            log.debug("Added fetch request for partition {} to 
node {}", tip, node.id());
+                        }
+                    }
+                }
+            }
+        });
+
         Map<Node, ShareFetchRequest.Builder> builderMap = new 
LinkedHashMap<>();
         for (Map.Entry<Node, ShareSessionHandler> entry : 
handlerMap.entrySet()) {
             builderMap.put(entry.getKey(), 
entry.getValue().newShareFetchBuilder(groupId, fetchConfig));
+            Node node = entry.getKey();
+            ShareFetchRequest.Builder builder = builderMap.get(entry.getKey());

Review Comment:
   This is a bit odd. Surely it would be better to assign the `node` and 
`builder` first, using `entry.getKey/value()`, and then add the builder to the 
map, rather than adding the builder into the map, and then re-get it 
immediately after.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to