jolshan commented on code in PR #14789: URL: https://github.com/apache/kafka/pull/14789#discussion_r1398026217
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ########## @@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() { assertEquals(5, recordsToTest.get(1).offset()); } + /** + * KAFKA-15836: + * Test that max.poll.records is honoured when consuming from multiple topic-partitions and the + * fetched records are not aligned on max.poll.records boundaries. + * + * tp0 has records 1,2,3; tp1 has records 6,7,8 + * max.poll.records is 2 + * + * poll 1 should return 1,2 + * poll 2 should return 3,6 + * poll 3 should return 7,8 + * + * Or similar :) + */ + @Test + public void testFetchMaxPollRecordsUnaligned() { + buildFetcher(2); + + Set<TopicPartition> tps = new HashSet<>(); + tps.add(tp0); + tps.add(tp1); + assignFromUser(tps); + subscriptions.seek(tp0, 1); + subscriptions.seek(tp1, 6); + + client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, moreRecords, 100L)); + client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, Errors.NONE, 100L, 0)); + + assertEquals(1, sendFetches()); + consumerClient.poll(time.timer(0)); Review Comment: It's a little confusing to reuse the recordsByPartition, fetchedRecords values on each call. I wonder if there is a way to modularize or make it clearer things are being reset. I'm also wondering if we could have comments on each of these cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org