[GitHub] [kafka] showuon commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-06-15 Thread via GitHub


showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230929414


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -3114,6 +3149,54 @@ public void testFetchCommittedOffsets() {
 assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), 
fetchedOffsets.get(t1p));
 }
 
+@Test
+public void testPopulatingOffsetCacheForAssignedPartition() {
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+Map committedOffsetsCache = 
coordinator.committedOffsetsCache();
+// committedOffsetsCache should be empty
+assertTrue(committedOffsetsCache.isEmpty());
+
+long offset = 500L;
+String metadata = "blahblah";
+Optional leaderEpoch = Optional.of(15);
+OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+metadata, Errors.NONE);
+
+client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+subscriptions.assignFromUser(singleton(t1p));
+Map fetchedOffsets = 
coordinator.fetchCommittedOffsets(singleton(t1p),
+time.timer(Long.MAX_VALUE));
+
+assertNotNull(fetchedOffsets);
+OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+// check committedOffsetsCache is populated
+assertEquals( 1, committedOffsetsCache.size());
+assertEquals(offsetAndMetadata, committedOffsetsCache.get(t1p));
+
+// fetch again with t1p + t2p, but will send fetch for t2p since t1p 
is in cache
+long offsetPartition2 = 50L;
+String metadataPartition2 = "foobar";
+Optional leaderEpochPartition2 = Optional.of(19909);
+data = new OffsetFetchResponse.PartitionData(offsetPartition2, 
leaderEpochPartition2,
+metadataPartition2, Errors.NONE);
+client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t2p, data)));
+
+fetchedOffsets = coordinator.fetchCommittedOffsets(new 
HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE));
+
+assertNotNull(fetchedOffsets);
+
+assertEquals(2, fetchedOffsets.size()); // tp1 and tp2 should be 
returned with tp1 coming from cache
+assertEquals( 1, committedOffsetsCache.size()); // cache size is still 
1 since only tp1 is an owned partition

Review Comment:
   nit: additional space in front of `1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-06-15 Thread via GitHub


showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230868853


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -2521,11 +2524,36 @@ public void testCommitOffsetMetadata() {
 
 AtomicBoolean success = new AtomicBoolean(false);
 
-Map offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+Map offsets = singletonMap(t1p, 
offsetAndMetadata);
+Map cache = 
coordinator.committedOffsetsCache();
+assertTrue(cache.isEmpty());
 coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
 coordinator.invokeCompletedOffsetCommitCallbacks();
 assertTrue(success.get());
 assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

Review Comment:
   I know this is not your change, but please also update it. Thanks.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -3114,6 +3142,54 @@ public void testFetchCommittedOffsets() {
 assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), 
fetchedOffsets.get(t1p));
 }
 
+@Test
+public void testPopulatingOffsetCacheForAssignedPartition() {
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+Map committedOffsetsCache = 
coordinator.committedOffsetsCache();
+// committedOffsetsCache should be empty
+assertTrue(committedOffsetsCache.isEmpty());
+
+long offset = 500L;
+String metadata = "blahblah";
+Optional leaderEpoch = Optional.of(15);
+OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+metadata, Errors.NONE);
+
+client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+subscriptions.assignFromUser(singleton(t1p));
+Map fetchedOffsets = 
coordinator.fetchCommittedOffsets(singleton(t1p),
+time.timer(Long.MAX_VALUE));
+
+assertNotNull(fetchedOffsets);
+OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+// check committedOffsetsCache is populated
+assertEquals(committedOffsetsCache.size(), 1);

Review Comment:
   ` assertEquals( 1, committedOffsetsCache.size());`



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -3114,6 +3142,54 @@ public void testFetchCommittedOffsets() {
 assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), 
fetchedOffsets.get(t1p));
 }
 
+@Test
+public void testPopulatingOffsetCacheForAssignedPartition() {
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+Map committedOffsetsCache = 
coordinator.committedOffsetsCache();
+// committedOffsetsCache should be empty
+assertTrue(committedOffsetsCache.isEmpty());
+
+long offset = 500L;
+String metadata = "blahblah";
+Optional leaderEpoch = Optional.of(15);
+OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+metadata, Errors.NONE);
+
+client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+subscriptions.assignFromUser(singleton(t1p));
+Map fetchedOffsets = 
coordinator.fetchCommittedOffsets(singleton(t1p),
+time.timer(Long.MAX_VALUE));
+
+assertNotNull(fetchedOffsets);
+OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+// check committedOffsetsCache is populated
+assertEquals(committedOffsetsCache.size(), 1);
+assertEquals(offsetAndMetadata, committedOffsetsCache.get(t1p));
+
+// fetch again with t1p + t2p, but will send fetch for t2p since t1p 
is in cache
+long offsetPartition2 = 50L;
+String metadataPartition2 = "foobar";
+Optional leaderEpochPartition2 = Optional.of(19909);
+data = new OffsetFetchResponse.PartitionData(offsetPartition2, 
leaderEpochPartition2,
+metadataPartition2, Errors.NONE);
+client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t2p, data)));
+
+fetchedOffsets = coordinator.fetchCommittedOffsets(new 
HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE));
+
+assertNotNull(fetchedOffsets);
+
+assertEquals(fetched

[GitHub] [kafka] showuon commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-06-14 Thread via GitHub


showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230345113


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -127,6 +127,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
 // hold onto request&future for committed offset requests to enable async 
calls.
 private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
+// holds the offset metadata for assigned partitions to reduce remote 
calls thus speeding up fetching partition metadata
+private final Map 
committedTopicPartitionOffsetsCache;

Review Comment:
   nit: the comment above should mention this is the `committed offset metadata`



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -922,13 +922,9 @@ public void testCommitsFetchedDuringAssign() {
 
 // fetch offset for two topics
 Map offsets = new HashMap<>();
-offsets.put(tp0, offset1);
-client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), 
coordinator);
-assertEquals(offset1, 
consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
-
-offsets.remove(tp0);
 offsets.put(tp1, offset2);

Review Comment:
   Could we add a comment above about why we only need to respond with `tp1, 
offset2`? Something about it's been cached in previous committed offset fetch.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
 AtomicBoolean success = new AtomicBoolean(false);
 
-Map offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+Map offsets = singletonMap(t1p, 
offsetAndMetadata);
 coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
 coordinator.invokeCompletedOffsetCommitCallbacks();
 assertTrue(success.get());
 assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+Map cache = 
coordinator.committedOffsetsCache();
+assertEquals(cache.size(), 1);
+assertEquals(cache.get(t1p), offsetAndMetadata);
+}
+
+@Test
+public void testCommitOffsetMetadataSync() {

Review Comment:
   Thanks for adding the sync test



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
 AtomicBoolean success = new AtomicBoolean(false);
 
-Map offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+Map offsets = singletonMap(t1p, 
offsetAndMetadata);
 coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
 coordinator.invokeCompletedOffsetCommitCallbacks();
 assertTrue(success.get());
 assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+Map cache = 
coordinator.committedOffsetsCache();
+assertEquals(cache.size(), 1);
+assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   assertEquals method signature is `assertEquals(int expected, int actual)`. 
Putting the parameter in the correct order will output the reasonable error 
message if any.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
 AtomicBoolean success = new AtomicBoolean(false);
 
-Map offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+Map offsets = singletonMap(t1p, 
offsetAndMetadata);
 coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
 coordinator.invokeCompletedOffsetCommitCallbacks();
 assertTrue(success.get());
 assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+Map cache = 
coordinator.committedOffsetsCache();
+assertEquals(cache.size(), 1);
+assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   Also, could we assert cache is empty before we `commitOffsetsAsync`? i.e. 
   ```
   assertTrue(cache.isEmpty());
   coordinator.commitOffsetsAsync(...)
   ...
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -351,6 +359,10 @@ private Exception invokePartitionsRevoked(final 
SortedSet revoke
 final long startMs = time.milliseconds();
 listener.onPartitionsRevoked(revokedPartitions);
 sensors.revokeCallbackSensor.record(time.mi

[GitHub] [kafka] showuon commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-05-18 Thread via GitHub


showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1197604865


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -123,6 +124,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
 // hold onto request&future for committed offset requests to enable async 
calls.
 private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
+// holds the offset metadata for assigned partitions to reduce remote 
calls thus speeding up fetching partition metadata
+private final Map partitionOffsetsCache;

Review Comment:
   The term `offset` might be confused because we have the `offsets` fetching, 
and `offsets` committing. Please add `committed` in the variable name. Thanks.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -234,6 +238,11 @@ SubscriptionState subscriptionState() {
 return this.subscriptions;
 }
 
+// package private for testing
+Map offsetsCache() {

Review Comment:
   method name: committedOffsetsCache



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1007,16 +1047,33 @@ public Map 
fetchCommittedOffsets(final Set freshOffsets = 
future.value();
+
+// update cache for assigned partitions that are not 
cached yet
+for (TopicPartition nonCachedAssignedPartition: 
nonCachedAssignedPartitions) {
+if 
(!this.subscriptions.isAssigned(nonCachedAssignedPartition)) {
+// it is possible that the topic is no longer 
assigned when the response is received,
+// in this case we do not update the cache with 
the fresh value
+continue;
+}
+
+OffsetAndMetadata offset = 
freshOffsets.get(nonCachedAssignedPartition);
+if (offset != null) { // it is possible that the 
offset and metadata were not fetched
+
this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset);

Review Comment:
   Some high level comment here.
   You added offset commit cache when fetching offsets. Currently, we will 
fetch offset commit when consumer proactively fetch it (won't happen 
frequently), and also when the consumer first time got assigned a partition, so 
it needs to know where to start to fetch. And after consumer starts, it'll 
periodically commit offsets. That means, if we cache this data, it'll be 
out-dated soon after consumer committed offsets.
   
   On the other hand, if we cache at the place where we committed offsets (i.e. 
`OffsetCommitResponseHandler`), we should be able to get the latest committed 
offsets for each partition whenever the consumer commits it. Does that make 
sense?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -222,6 +225,7 @@ public ConsumerCoordinator(GroupRebalanceConfig 
rebalanceConfig,
 }
 
 this.metadata.requestUpdate();
+this.partitionOffsetsCache = new ConcurrentHashMap<>();

Review Comment:
   In consumer, there are 2 threads currently, 1 for heartbeat, 1 for main 
consumer thread. From my understanding, the heartbeat thread won't do offsets 
committing. If so, using HashMap should be fine. Could you help confirm it? 
Check `AbstractCoordinator#HeartbeatThread`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -346,6 +355,10 @@ private Exception invokePartitionsRevoked(final 
SortedSet revoke
 final long startMs = time.milliseconds();
 listener.onPartitionsRevoked(revokedPartitions);
 sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
+// remove the offset metadata cache for revoked partitions
+for (TopicPartition revokedPartition: revokedPartitions) {
+this.partitionOffsetsCache.remove(revokedPartition);
+}

Review Comment:
   `invokePartitionsRevoked` won't happen in eagar consumer protocol after 
partition assignment in each rebalance (check 
`ConsumerCoordinator#onJoinComplete`). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org