machi1990 commented on code in PR #13665: URL: https://github.com/apache/kafka/pull/13665#discussion_r1198622895
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1007,16 +1047,33 @@ public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(final Set<To if (pendingCommittedOffsetRequest != null) { future = pendingCommittedOffsetRequest.response; } else { - future = sendOffsetFetchRequest(partitions); - pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future); + future = sendOffsetFetchRequest(nonCachedPartitions); + pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(nonCachedPartitions, generationForOffsetRequest, future); } client.poll(future, timer); if (future.isDone()) { pendingCommittedOffsetRequest = null; if (future.succeeded()) { - return future.value(); + Map<TopicPartition, OffsetAndMetadata> freshOffsets = future.value(); + + // update cache for assigned partitions that are not cached yet + for (TopicPartition nonCachedAssignedPartition: nonCachedAssignedPartitions) { + if (!this.subscriptions.isAssigned(nonCachedAssignedPartition)) { + // it is possible that the topic is no longer assigned when the response is received, + // in this case we do not update the cache with the fresh value + continue; + } + + OffsetAndMetadata offset = freshOffsets.get(nonCachedAssignedPartition); + if (offset != null) { // it is possible that the offset and metadata were not fetched + this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset); Review Comment: Thank for you for this, I get the gist of what you are saying and it makes sense. I'll go have a look at this again and re-visit it with that new understanding or I'll come back with additional questions if I need of more understanding. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org