jolshan commented on code in PR #14789:
URL: https://github.com/apache/kafka/pull/14789#discussion_r1398026217


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() {
         assertEquals(5, recordsToTest.get(1).offset());
     }
 
+    /**
+     * KAFKA-15836:
+     * Test that max.poll.records is honoured when consuming from multiple 
topic-partitions and the
+     * fetched records are not aligned on max.poll.records boundaries.
+     *
+     * tp0 has records 1,2,3; tp1 has records 6,7,8
+     * max.poll.records is 2
+     * 
+     * poll 1 should return 1,2
+     * poll 2 should return 3,6
+     * poll 3 should return 7,8
+     * 
+     * Or similar :)
+     */
+    @Test
+    public void testFetchMaxPollRecordsUnaligned() {
+        buildFetcher(2);
+
+        Set<TopicPartition> tps = new HashSet<>();
+        tps.add(tp0);
+        tps.add(tp1);
+        assignFromUser(tps);
+        subscriptions.seek(tp0, 1);
+        subscriptions.seek(tp1, 6);
+
+        client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, 
moreRecords, 100L));
+        client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, 
Errors.NONE, 100L, 0));
+
+        assertEquals(1, sendFetches());
+        consumerClient.poll(time.timer(0));

Review Comment:
   It's a little confusing to reuse the recordsByPartition, fetchedRecords 
values on each call. I wonder if there is a way to modularize or make it 
clearer things are being reset.
   
   I'm also wondering if we could have comments on each of these cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to