Repository: kafka Updated Branches: refs/heads/trunk ef5d168cc -> 68f42210a
MINOR: add test case for fetching from a compacted topic Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang Closes #426 from hachikuji/compacted-topics Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68f42210 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68f42210 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68f42210 Branch: refs/heads/trunk Commit: 68f42210a1c8ce64846ffdc2cdbecc6fa5b87739 Parents: ef5d168 Author: Jason Gustafson <[email protected]> Authored: Wed Nov 4 17:31:34 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 4 17:31:34 2015 -0800 ---------------------------------------------------------------------- .../clients/consumer/internals/FetcherTest.java | 29 ++++++++++++++++++++ 1 file changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/68f42210/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 8711830..7b1e4cb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -128,6 +128,35 @@ public class FetcherTest { } } + @Test + public void testFetchNonContinuousRecords() { + // if we are fetching from a compacted topic, there may be gaps in the returned records + // this test verifies the fetcher updates the current fetched/consumed positions correctly for this case + + MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); + records.append(15L, "key".getBytes(), "value-1".getBytes()); + records.append(20L, "key".getBytes(), "value-2".getBytes()); + records.append(30L, "key".getBytes(), "value-3".getBytes()); + records.close(); + + List<ConsumerRecord<byte[], byte[]>> consumerRecords; + subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.seek(tp, 0); + + // normal fetch + fetcher.initFetches(cluster); + client.prepareResponse(fetchResponse(records.buffer(), Errors.NONE.code(), 100L, 0)); + consumerClient.poll(0); + consumerRecords = fetcher.fetchedRecords().get(tp); + assertEquals(3, consumerRecords.size()); + assertEquals(31L, (long) subscriptions.fetched(tp)); // this is the next fetching position + assertEquals(31L, (long) subscriptions.consumed(tp)); + + assertEquals(15L, consumerRecords.get(0).offset()); + assertEquals(20L, consumerRecords.get(1).offset()); + assertEquals(30L, consumerRecords.get(2).offset()); + } + @Test(expected = RecordTooLargeException.class) public void testFetchRecordTooLarge() { subscriptions.assignFromUser(Arrays.asList(tp));
