machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198622895


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1007,16 +1047,33 @@ public Map<TopicPartition, OffsetAndMetadata> 
fetchCommittedOffsets(final Set<To
             if (pendingCommittedOffsetRequest != null) {
                 future = pendingCommittedOffsetRequest.response;
             } else {
-                future = sendOffsetFetchRequest(partitions);
-                pendingCommittedOffsetRequest = new 
PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
+                future = sendOffsetFetchRequest(nonCachedPartitions);
+                pendingCommittedOffsetRequest = new 
PendingCommittedOffsetRequest(nonCachedPartitions, generationForOffsetRequest, 
future);
             }
             client.poll(future, timer);
 
             if (future.isDone()) {
                 pendingCommittedOffsetRequest = null;
 
                 if (future.succeeded()) {
-                    return future.value();
+                    Map<TopicPartition, OffsetAndMetadata> freshOffsets = 
future.value();
+
+                    // update cache for assigned partitions that are not 
cached yet
+                    for (TopicPartition nonCachedAssignedPartition: 
nonCachedAssignedPartitions) {
+                        if 
(!this.subscriptions.isAssigned(nonCachedAssignedPartition)) {
+                            // it is possible that the topic is no longer 
assigned when the response is received,
+                            // in this case we do not update the cache with 
the fresh value
+                            continue;
+                        }
+
+                        OffsetAndMetadata offset = 
freshOffsets.get(nonCachedAssignedPartition);
+                        if (offset != null) { // it is possible that the 
offset and metadata were not fetched
+                            
this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset);

Review Comment:
   Thank for you for this, I get the gist of what you are saying and it makes 
sense. I'll go have a look at this again and re-visit it with that new 
understanding or I'll come back with additional questions if I need of more 
understanding. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to