LiamClarkeNZ commented on code in PR #19850:
URL: https://github.com/apache/kafka/pull/19850#discussion_r2403746075


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java:
##########
@@ -77,7 +86,7 @@ public void testStickyPartitioning() {
         assertNotEquals(partA, 
builtInPartitionerA.peekCurrentPartitionInfo(testCluster).partition());
 
         // Check that switching works even when there is one partition.
-        BuiltInPartitioner builtInPartitionerB = new 
SequentialPartitioner(logContext, TOPIC_B, 1);
+        BuiltInPartitioner builtInPartitionerB = new 
SequentialPartitioner(logContext, TOPIC_B, 1, false, "");

Review Comment:
   ```suggestion
           BuiltInPartitioner builtInPartitionerB = new 
SequentialPartitioner(logContext, TOPIC_B, 1, rackAware, clientRackId);
   ```
   
   Just to make the meaning of `false` and `""` obvious at a glance :) 



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java:
##########
@@ -58,7 +67,7 @@ public void testStickyPartitioning() {
             Collections.emptySet(), Collections.emptySet());
 
         // Create partitions with "sticky" batch size to accommodate 3 records.
-        BuiltInPartitioner builtInPartitionerA = new 
SequentialPartitioner(logContext, TOPIC_A, 3);
+        BuiltInPartitioner builtInPartitionerA = new 
SequentialPartitioner(logContext, TOPIC_A, 3, false, "");

Review Comment:
   ```suggestion
           final boolean rackAware = false; 
           final String clientRackId = "";
           BuiltInPartitioner builtInPartitionerA = new 
SequentialPartitioner(logContext, TOPIC_A, 3, rackAware, clientRackId);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -40,8 +41,10 @@ public class BuiltInPartitioner {
     private final Logger log;
     private final String topic;
     private final int stickyBatchSize;
+    private final boolean rackAware;
+    private final String rack;
 
-    private volatile PartitionLoadStats partitionLoadStats = null;
+    private volatile PartitionLoadStatsHolder partitionLoadStats = null;

Review Comment:
   ```suggestion
       private volatile PartitionLoadStatsHolder partitionLoadStatsHolder = 
null;
   ```
   
   As it got a bit confusing when later the code is retrieving 
`partitionLoadStats.inThisRack` which is a member of `PartitionLoadStatsHolder` 
of type `PartitionLoadStats`.  :D 



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java:
##########
@@ -144,22 +245,34 @@ public void unavailablePartitionsTest() {
         assertEquals(0, partC);
     }
 
-    @Test
-    public void adaptivePartitionsTest() {
-        BuiltInPartitioner builtInPartitioner = new 
SequentialPartitioner(logContext, TOPIC_A, 1);
+    @ParameterizedTest
+    // All these cases exclude rack-aware partitioning,
+    // but ensure various combinations of broker and client rack settings 
don't cause problems.
+    @CsvSource({
+        "false,false,",
+        "true,false,",
+        "false,true,rack0",
+    })
+    public void adaptivePartitionsTest(boolean brokerRacksArePresent, boolean 
clientRackAware, String clientRack) {
+        BuiltInPartitioner builtInPartitioner = new 
SequentialPartitioner(logContext, TOPIC_A, 1, clientRackAware, clientRack);
 
         // Simulate partition queue sizes.
         int[] queueSizes = {5, 0, 3, 0, 1};
         int[] partitionIds = new int[queueSizes.length];
+        String[] partitionRacks = new String[queueSizes.length];
         int[] expectedFrequencies = new int[queueSizes.length];
         List<PartitionInfo> allPartitions = new ArrayList<>();
         for (int i = 0; i < partitionIds.length; i++) {
+            final Node leader = NODES[i % NODES.length];
             partitionIds[i] = i;
-            allPartitions.add(new PartitionInfo(TOPIC_A, i, NODES[i % 
NODES.length], NODES, NODES));
+            if (brokerRacksArePresent) {
+                partitionRacks[i] = leader.rack();
+            }
+            allPartitions.add(new PartitionInfo(TOPIC_A, i, leader, NODES, 
NODES));
             expectedFrequencies[i] = 6 - queueSizes[i];  // 6 is 
max(queueSizes) + 1

Review Comment:
   Love the comment :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to