Repository: kafka
Updated Branches:
  refs/heads/trunk a931e9954 -> f0152a7fd


KAFKA-5097; Add testFetchAfterPartitionWithFetchedRecordsIsUnassigned

I verified that the test would trigger an `IllegalStateException` if the
`position` call was added back.

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Eno Thereska <e...@confluent.io>, Jason Gustafson 
<ja...@confluent.io>

Closes #2887 from ijuma/kafka-5097-unit-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f0152a7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f0152a7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f0152a7f

Branch: refs/heads/trunk
Commit: f0152a7fdac2ae4dcac65d5ed24fa201f3d30120
Parents: a931e99
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Thu Apr 27 23:40:33 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Apr 27 23:40:33 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/MockClient.java    |  3 +-
 .../clients/consumer/internals/FetcherTest.java | 45 +++++++++++++++++++-
 2 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f0152a7f/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index f4141a5..8fff3cc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -153,7 +153,8 @@ public class MockClient implements KafkaClient {
             short version = nodeApiVersions.usableVersion(request.apiKey(), 
builder.desiredVersion());
             AbstractRequest abstractRequest = 
request.requestBuilder().build(version);
             if (!futureResp.requestMatcher.matches(abstractRequest))
-                throw new IllegalStateException("Next in line response did not 
match expected request");
+                throw new IllegalStateException("Next in line response did not 
match expected request, request: "
+                        + abstractRequest);
             ClientResponse resp = new 
ClientResponse(request.makeHeader(version), request.callback(), 
request.destination(),
                     request.createdTimeMs(), time.milliseconds(), 
futureResp.disconnected, null, futureResp.responseBody);
             responses.add(resp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f0152a7f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6059180..b41e6ac 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -370,6 +370,45 @@ public class FetcherTest {
         assertEquals(5, records.get(1).offset());
     }
 
+    /**
+     * Test the scenario where a partition with fetched but not consumed 
records (i.e. max.poll.records is
+     * less than the number of fetched records) is unassigned and a different 
partition is assigned. This is a
+     * pattern used by Streams state restoration and KAFKA-5097 would have 
been caught by this test.
+     */
+    @Test
+    public void testFetchAfterPartitionWithFetchedRecordsIsUnassigned() {
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new 
Metrics(time), 2);
+
+        List<ConsumerRecord<byte[], byte[]>> records;
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 1);
+
+        // Returns 3 records while `max.poll.records` is configured to 2
+        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, 
this.records, Errors.NONE, 100L, 0));
+
+        assertEquals(1, fetcher.sendFetches());
+        consumerClient.poll(0);
+        records = fetcher.fetchedRecords().get(tp1);
+        assertEquals(2, records.size());
+        assertEquals(3L, subscriptions.position(tp1).longValue());
+        assertEquals(1, records.get(0).offset());
+        assertEquals(2, records.get(1).offset());
+
+        subscriptions.assignFromUser(singleton(tp2));
+        client.prepareResponse(matchesOffset(tp2, 4), fetchResponse(tp2, 
this.nextRecords, Errors.NONE, 100L, 0));
+        subscriptions.seek(tp2, 4);
+
+        assertEquals(1, fetcher.sendFetches());
+        consumerClient.poll(0);
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetcher.fetchedRecords();
+        assertNull(fetchedRecords.get(tp1));
+        records = fetchedRecords.get(tp2);
+        assertEquals(2, records.size());
+        assertEquals(6L, subscriptions.position(tp2).longValue());
+        assertEquals(4, records.get(0).offset());
+        assertEquals(5, records.get(1).offset());
+    }
+
     @Test
     public void testFetchNonContinuousRecords() {
         // if we are fetching from a compacted topic, there may be gaps in the 
returned records
@@ -1467,7 +1506,11 @@ public class FetcherTest {
     }
 
     private FetchResponse fetchResponse(MemoryRecords records, Errors error, 
long hw, int throttleTime) {
-        Map<TopicPartition, FetchResponse.PartitionData> partitions = 
Collections.singletonMap(tp1,
+        return fetchResponse(tp1, records, error, hw, throttleTime);
+    }
+
+    private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords 
records, Errors error, long hw, int throttleTime) {
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = 
Collections.singletonMap(tp,
                 new FetchResponse.PartitionData(error, hw, 
FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
         return new FetchResponse(new LinkedHashMap<>(partitions), 
throttleTime);
     }

Reply via email to