machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230620093


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3137,70 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), 
fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = 
coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertEquals(committedOffsetsCache.size(), 0);
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = 
coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata expected = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+        assertEquals(expected, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);
+        assertEquals(expected, committedOffsetsCache.get(t1p));
+    }
+
+    @Test
+    public void testReturningCachedOffsetForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = 
coordinator.committedOffsetsCache();
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+        committedOffsetsCache.put(t1p, offsetAndMetadata);

Review Comment:
   Yes, actually this test case already somehow covers what's tested in the 
above! I'll merge the two cases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to