showuon commented on code in PR #13665: URL: https://github.com/apache/kafka/pull/13665#discussion_r1197604865
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -123,6 +124,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // hold onto request&future for committed offset requests to enable async calls. private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null; + // holds the offset metadata for assigned partitions to reduce remote calls thus speeding up fetching partition metadata + private final Map<TopicPartition, OffsetAndMetadata> partitionOffsetsCache; Review Comment: The term `offset` might be confused because we have the `offsets` fetching, and `offsets` committing. Please add `committed` in the variable name. Thanks. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -234,6 +238,11 @@ SubscriptionState subscriptionState() { return this.subscriptions; } + // package private for testing + Map<TopicPartition, OffsetAndMetadata> offsetsCache() { Review Comment: method name: committedOffsetsCache ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1007,16 +1047,33 @@ public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(final Set<To if (pendingCommittedOffsetRequest != null) { future = pendingCommittedOffsetRequest.response; } else { - future = sendOffsetFetchRequest(partitions); - pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future); + future = sendOffsetFetchRequest(nonCachedPartitions); + pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(nonCachedPartitions, generationForOffsetRequest, future); } client.poll(future, timer); if (future.isDone()) { pendingCommittedOffsetRequest = null; if (future.succeeded()) { - return future.value(); + Map<TopicPartition, OffsetAndMetadata> freshOffsets = future.value(); + + // update cache for assigned partitions that are not cached yet + for (TopicPartition nonCachedAssignedPartition: nonCachedAssignedPartitions) { + if (!this.subscriptions.isAssigned(nonCachedAssignedPartition)) { + // it is possible that the topic is no longer assigned when the response is received, + // in this case we do not update the cache with the fresh value + continue; + } + + OffsetAndMetadata offset = freshOffsets.get(nonCachedAssignedPartition); + if (offset != null) { // it is possible that the offset and metadata were not fetched + this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset); Review Comment: Some high level comment here. You added offset commit cache when fetching offsets. Currently, we will fetch offset commit when consumer proactively fetch it (won't happen frequently), and also when the consumer first time got assigned a partition, so it needs to know where to start to fetch. And after consumer starts, it'll periodically commit offsets. That means, if we cache this data, it'll be out-dated soon after consumer committed offsets. On the other hand, if we cache at the place where we committed offsets (i.e. `OffsetCommitResponseHandler`), we should be able to get the latest committed offsets for each partition whenever the consumer commits it. Does that make sense? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -222,6 +225,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, } this.metadata.requestUpdate(); + this.partitionOffsetsCache = new ConcurrentHashMap<>(); Review Comment: In consumer, there are 2 threads currently, 1 for heartbeat, 1 for main consumer thread. From my understanding, the heartbeat thread won't do offsets committing. If so, using HashMap should be fine. Could you help confirm it? Check `AbstractCoordinator#HeartbeatThread`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -346,6 +355,10 @@ private Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revoke final long startMs = time.milliseconds(); listener.onPartitionsRevoked(revokedPartitions); sensors.revokeCallbackSensor.record(time.milliseconds() - startMs); + // remove the offset metadata cache for revoked partitions + for (TopicPartition revokedPartition: revokedPartitions) { + this.partitionOffsetsCache.remove(revokedPartition); + } Review Comment: `invokePartitionsRevoked` won't happen in eagar consumer protocol after partition assignment in each rebalance (check `ConsumerCoordinator#onJoinComplete`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org