showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1197604865


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -123,6 +124,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
     // hold onto request&future for committed offset requests to enable async 
calls.
     private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
+    // holds the offset metadata for assigned partitions to reduce remote 
calls thus speeding up fetching partition metadata
+    private final Map<TopicPartition, OffsetAndMetadata> partitionOffsetsCache;

Review Comment:
   The term `offset` might be confused because we have the `offsets` fetching, 
and `offsets` committing. Please add `committed` in the variable name. Thanks.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -234,6 +238,11 @@ SubscriptionState subscriptionState() {
         return this.subscriptions;
     }
 
+    // package private for testing
+    Map<TopicPartition, OffsetAndMetadata> offsetsCache() {

Review Comment:
   method name: committedOffsetsCache



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1007,16 +1047,33 @@ public Map<TopicPartition, OffsetAndMetadata> 
fetchCommittedOffsets(final Set<To
             if (pendingCommittedOffsetRequest != null) {
                 future = pendingCommittedOffsetRequest.response;
             } else {
-                future = sendOffsetFetchRequest(partitions);
-                pendingCommittedOffsetRequest = new 
PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
+                future = sendOffsetFetchRequest(nonCachedPartitions);
+                pendingCommittedOffsetRequest = new 
PendingCommittedOffsetRequest(nonCachedPartitions, generationForOffsetRequest, 
future);
             }
             client.poll(future, timer);
 
             if (future.isDone()) {
                 pendingCommittedOffsetRequest = null;
 
                 if (future.succeeded()) {
-                    return future.value();
+                    Map<TopicPartition, OffsetAndMetadata> freshOffsets = 
future.value();
+
+                    // update cache for assigned partitions that are not 
cached yet
+                    for (TopicPartition nonCachedAssignedPartition: 
nonCachedAssignedPartitions) {
+                        if 
(!this.subscriptions.isAssigned(nonCachedAssignedPartition)) {
+                            // it is possible that the topic is no longer 
assigned when the response is received,
+                            // in this case we do not update the cache with 
the fresh value
+                            continue;
+                        }
+
+                        OffsetAndMetadata offset = 
freshOffsets.get(nonCachedAssignedPartition);
+                        if (offset != null) { // it is possible that the 
offset and metadata were not fetched
+                            
this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset);

Review Comment:
   Some high level comment here.
   You added offset commit cache when fetching offsets. Currently, we will 
fetch offset commit when consumer proactively fetch it (won't happen 
frequently), and also when the consumer first time got assigned a partition, so 
it needs to know where to start to fetch. And after consumer starts, it'll 
periodically commit offsets. That means, if we cache this data, it'll be 
out-dated soon after consumer committed offsets.
   
   On the other hand, if we cache at the place where we committed offsets (i.e. 
`OffsetCommitResponseHandler`), we should be able to get the latest committed 
offsets for each partition whenever the consumer commits it. Does that make 
sense?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -222,6 +225,7 @@ public ConsumerCoordinator(GroupRebalanceConfig 
rebalanceConfig,
         }
 
         this.metadata.requestUpdate();
+        this.partitionOffsetsCache = new ConcurrentHashMap<>();

Review Comment:
   In consumer, there are 2 threads currently, 1 for heartbeat, 1 for main 
consumer thread. From my understanding, the heartbeat thread won't do offsets 
committing. If so, using HashMap should be fine. Could you help confirm it? 
Check `AbstractCoordinator#HeartbeatThread`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -346,6 +355,10 @@ private Exception invokePartitionsRevoked(final 
SortedSet<TopicPartition> revoke
             final long startMs = time.milliseconds();
             listener.onPartitionsRevoked(revokedPartitions);
             sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
+            // remove the offset metadata cache for revoked partitions
+            for (TopicPartition revokedPartition: revokedPartitions) {
+                this.partitionOffsetsCache.remove(revokedPartition);
+            }

Review Comment:
   `invokePartitionsRevoked` won't happen in eagar consumer protocol after 
partition assignment in each rebalance (check 
`ConsumerCoordinator#onJoinComplete`). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to