machi1990 commented on code in PR #13665: URL: https://github.com/apache/kafka/pull/13665#discussion_r1230936981
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -3114,6 +3149,54 @@ public void testFetchCommittedOffsets() { assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p)); } + @Test + public void testPopulatingOffsetCacheForAssignedPartition() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache(); + // committedOffsetsCache should be empty + assertTrue(committedOffsetsCache.isEmpty()); + + long offset = 500L; + String metadata = "blahblah"; + Optional<Integer> leaderEpoch = Optional.of(15); + OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch, + metadata, Errors.NONE); + + client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); + subscriptions.assignFromUser(singleton(t1p)); + Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p), + time.timer(Long.MAX_VALUE)); + + assertNotNull(fetchedOffsets); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata); + assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p)); + + // check committedOffsetsCache is populated + assertEquals( 1, committedOffsetsCache.size()); + assertEquals(offsetAndMetadata, committedOffsetsCache.get(t1p)); + + // fetch again with t1p + t2p, but will send fetch for t2p since t1p is in cache + long offsetPartition2 = 50L; + String metadataPartition2 = "foobar"; + Optional<Integer> leaderEpochPartition2 = Optional.of(19909); + data = new OffsetFetchResponse.PartitionData(offsetPartition2, leaderEpochPartition2, + metadataPartition2, Errors.NONE); + client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t2p, data))); + + fetchedOffsets = coordinator.fetchCommittedOffsets(new HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE)); + + assertNotNull(fetchedOffsets); + + assertEquals(2, fetchedOffsets.size()); // tp1 and tp2 should be returned with tp1 coming from cache + assertEquals( 1, committedOffsetsCache.size()); // cache size is still 1 since only tp1 is an owned partition Review Comment: Typo in IDE. Good catch, should be corrected now! Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org