Repository: kafka Updated Branches: refs/heads/trunk a931e9954 -> f0152a7fd
KAFKA-5097; Add testFetchAfterPartitionWithFetchedRecordsIsUnassigned I verified that the test would trigger an `IllegalStateException` if the `position` call was added back. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Eno Thereska <e...@confluent.io>, Jason Gustafson <ja...@confluent.io> Closes #2887 from ijuma/kafka-5097-unit-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f0152a7f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f0152a7f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f0152a7f Branch: refs/heads/trunk Commit: f0152a7fdac2ae4dcac65d5ed24fa201f3d30120 Parents: a931e99 Author: Ismael Juma <ism...@juma.me.uk> Authored: Thu Apr 27 23:40:33 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Apr 27 23:40:33 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/MockClient.java | 3 +- .../clients/consumer/internals/FetcherTest.java | 45 +++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f0152a7f/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index f4141a5..8fff3cc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -153,7 +153,8 @@ public class MockClient implements KafkaClient { short version = nodeApiVersions.usableVersion(request.apiKey(), builder.desiredVersion()); AbstractRequest abstractRequest = request.requestBuilder().build(version); if (!futureResp.requestMatcher.matches(abstractRequest)) - throw new IllegalStateException("Next in line response did not match expected request"); + throw new IllegalStateException("Next in line response did not match expected request, request: " + + abstractRequest); ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, null, futureResp.responseBody); responses.add(resp); http://git-wip-us.apache.org/repos/asf/kafka/blob/f0152a7f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 6059180..b41e6ac 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -370,6 +370,45 @@ public class FetcherTest { assertEquals(5, records.get(1).offset()); } + /** + * Test the scenario where a partition with fetched but not consumed records (i.e. max.poll.records is + * less than the number of fetched records) is unassigned and a different partition is assigned. This is a + * pattern used by Streams state restoration and KAFKA-5097 would have been caught by this test. + */ + @Test + public void testFetchAfterPartitionWithFetchedRecordsIsUnassigned() { + Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2); + + List<ConsumerRecord<byte[], byte[]>> records; + subscriptions.assignFromUser(singleton(tp1)); + subscriptions.seek(tp1, 1); + + // Returns 3 records while `max.poll.records` is configured to 2 + client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); + + assertEquals(1, fetcher.sendFetches()); + consumerClient.poll(0); + records = fetcher.fetchedRecords().get(tp1); + assertEquals(2, records.size()); + assertEquals(3L, subscriptions.position(tp1).longValue()); + assertEquals(1, records.get(0).offset()); + assertEquals(2, records.get(1).offset()); + + subscriptions.assignFromUser(singleton(tp2)); + client.prepareResponse(matchesOffset(tp2, 4), fetchResponse(tp2, this.nextRecords, Errors.NONE, 100L, 0)); + subscriptions.seek(tp2, 4); + + assertEquals(1, fetcher.sendFetches()); + consumerClient.poll(0); + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); + assertNull(fetchedRecords.get(tp1)); + records = fetchedRecords.get(tp2); + assertEquals(2, records.size()); + assertEquals(6L, subscriptions.position(tp2).longValue()); + assertEquals(4, records.get(0).offset()); + assertEquals(5, records.get(1).offset()); + } + @Test public void testFetchNonContinuousRecords() { // if we are fetching from a compacted topic, there may be gaps in the returned records @@ -1467,7 +1506,11 @@ public class FetcherTest { } private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) { - Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp1, + return fetchResponse(tp1, records, error, hw, throttleTime); + } + + private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) { + Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records)); return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime); }