showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230929414


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3149,54 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), 
fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = 
coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertTrue(committedOffsetsCache.isEmpty());
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = 
coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+        assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals( 1, committedOffsetsCache.size());
+        assertEquals(offsetAndMetadata, committedOffsetsCache.get(t1p));
+
+        // fetch again with t1p + t2p, but will send fetch for t2p since t1p 
is in cache
+        long offsetPartition2 = 50L;
+        String metadataPartition2 = "foobar";
+        Optional<Integer> leaderEpochPartition2 = Optional.of(19909);
+        data = new OffsetFetchResponse.PartitionData(offsetPartition2, 
leaderEpochPartition2,
+                metadataPartition2, Errors.NONE);
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t2p, data)));
+
+        fetchedOffsets = coordinator.fetchCommittedOffsets(new 
HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+
+        assertEquals(2, fetchedOffsets.size()); // tp1 and tp2 should be 
returned with tp1 coming from cache
+        assertEquals( 1, committedOffsetsCache.size()); // cache size is still 
1 since only tp1 is an owned partition

Review Comment:
   nit: additional space in front of `1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to