ash-at-github commented on code in PR #20170:
URL: https://github.com/apache/kafka/pull/20170#discussion_r2217120785


##########
clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java:
##########
@@ -96,4 +96,24 @@ public void testRoundRobinWithKeyBytes() {
         assertEquals(10, partitionCount.get(1).intValue());
         assertEquals(10, partitionCount.get(2).intValue());
     }
+
+    @Test
+    public void testRoundRobinWithAbortForNewBatch() throws Exception {
+        final String topicA = "topicA";
+        final String topicB = "topicB";
+
+        Cluster testCluster = new Cluster("clusterId", asList(NODES[0]), 
Collections.emptyList(),
+                Collections.<String>emptySet(), 
Collections.<String>emptySet());
+
+        Partitioner partitioner = new RoundRobinPartitioner();
+
+        //abort for new batch - previous partition should be returned on 
subsequent call
+        //simulate three threads producing to two topics, with race condition 
in producer
+        partitioner.onNewBatch(topicA, testCluster, 7);
+        partitioner.onNewBatch(topicA, testCluster, 8);
+        partitioner.onNewBatch(topicB, testCluster, 1);
+        assertEquals(7, partitioner.partition(topicA, null, null, null, null, 
testCluster));
+        assertEquals(8, partitioner.partition(topicA, null, null, null, null, 
testCluster));
+        assertEquals(1, partitioner.partition(topicB, null, null, null, null, 
testCluster));

Review Comment:
   It's great that this test verifies the next partition selected matches the 
enqueued value for each topic. Consider adding test cases for some edge cases, 
like empty queue and error handling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to