machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198690005


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1007,16 +1047,33 @@ public Map<TopicPartition, OffsetAndMetadata> 
fetchCommittedOffsets(final Set<To
             if (pendingCommittedOffsetRequest != null) {
                 future = pendingCommittedOffsetRequest.response;
             } else {
-                future = sendOffsetFetchRequest(partitions);
-                pendingCommittedOffsetRequest = new 
PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
+                future = sendOffsetFetchRequest(nonCachedPartitions);
+                pendingCommittedOffsetRequest = new 
PendingCommittedOffsetRequest(nonCachedPartitions, generationForOffsetRequest, 
future);
             }
             client.poll(future, timer);
 
             if (future.isDone()) {
                 pendingCommittedOffsetRequest = null;
 
                 if (future.succeeded()) {
-                    return future.value();
+                    Map<TopicPartition, OffsetAndMetadata> freshOffsets = 
future.value();
+
+                    // update cache for assigned partitions that are not 
cached yet
+                    for (TopicPartition nonCachedAssignedPartition: 
nonCachedAssignedPartitions) {
+                        if 
(!this.subscriptions.isAssigned(nonCachedAssignedPartition)) {
+                            // it is possible that the topic is no longer 
assigned when the response is received,
+                            // in this case we do not update the cache with 
the fresh value
+                            continue;
+                        }
+
+                        OffsetAndMetadata offset = 
freshOffsets.get(nonCachedAssignedPartition);
+                        if (offset != null) { // it is possible that the 
offset and metadata were not fetched
+                            
this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset);

Review Comment:
   Hi @showuon I forgot to mention or call your attention on this line 
https://github.com/apache/kafka/blob/401fb417bf60864e6d380f979d268d895c5ad727/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1422
 which already updates the cache. Is that what you've in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to