ash-at-github commented on code in PR #20170: URL: https://github.com/apache/kafka/pull/20170#discussion_r2217120785
########## clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java: ########## @@ -96,4 +96,24 @@ public void testRoundRobinWithKeyBytes() { assertEquals(10, partitionCount.get(1).intValue()); assertEquals(10, partitionCount.get(2).intValue()); } + + @Test + public void testRoundRobinWithAbortForNewBatch() throws Exception { + final String topicA = "topicA"; + final String topicB = "topicB"; + + Cluster testCluster = new Cluster("clusterId", asList(NODES[0]), Collections.emptyList(), + Collections.<String>emptySet(), Collections.<String>emptySet()); + + Partitioner partitioner = new RoundRobinPartitioner(); + + //abort for new batch - previous partition should be returned on subsequent call + //simulate three threads producing to two topics, with race condition in producer + partitioner.onNewBatch(topicA, testCluster, 7); + partitioner.onNewBatch(topicA, testCluster, 8); + partitioner.onNewBatch(topicB, testCluster, 1); + assertEquals(7, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(8, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(1, partitioner.partition(topicB, null, null, null, null, testCluster)); Review Comment: It's great that this test verifies the next partition selected matches the enqueued value for each topic. Consider adding test cases for some edge cases, like empty queue and error handling -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org