lihaosky commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1272791951


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -145,147 +188,517 @@ public void disableActiveSinceRackMissingInClient() {
 
         // False since process1 doesn't have rackId
         assertFalse(assignor.validateClientRack());
-        assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
+        assertFalse(assignor.canEnableRackAwareAssignor());
     }
 
     @Test
-    public void disableActiveSinceRackDiffersInSameProcess() {
+    public void shouldDisableActiveWhenRackDiffersInSameProcess() {
         final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
 
         // Different consumers in same process have different rack ID. This 
shouldn't happen.
         // If happens, there's a bug somewhere
-        processRacks.computeIfAbsent(process0UUID, k -> new 
HashMap<>()).put("consumer1", Optional.of("rack1"));
-        processRacks.computeIfAbsent(process0UUID, k -> new 
HashMap<>()).put("consumer2", Optional.of("rack2"));
+        processRacks.computeIfAbsent(UUID_1, k -> new 
HashMap<>()).put("consumer1", Optional.of("rack1"));
+        processRacks.computeIfAbsent(UUID_1, k -> new 
HashMap<>()).put("consumer2", Optional.of("rack2"));
 
         final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
             getClusterForTopic0(),
-            getTaskTopicPartitionMapForTask1(),
+            getTaskTopicPartitionMapForTask0(),
             getTopologyGroupTaskMap(),
             processRacks,
             mockInternalTopicManager,
             new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
         );
 
         assertFalse(assignor.validateClientRack());
-        assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
+        assertFalse(assignor.canEnableRackAwareAssignor());
     }
 
     @Test
-    public void enableRackAwareAssignorForActiveWithoutDescribingTopics() {
+    public void 
shouldEnableRackAwareAssignorForActiveWithoutDescribingTopics() {
         final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
             getClusterForTopic0(),
-            getTaskTopicPartitionMapForTask1(),
+            getTaskTopicPartitionMapForTask0(),
             getTopologyGroupTaskMap(),
             getProcessRacksForProcess0(),
             mockInternalTopicManager,
             new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
         );
 
         // partitionWithoutInfo00 has rackInfo in cluster metadata
-        assertTrue(assignor.canEnableRackAwareAssignorForActiveTasks());
+        assertTrue(assignor.canEnableRackAwareAssignor());
     }
 
     @Test
-    public void enableRackAwareAssignorForActiveWithDescribingTopics() {
+    public void shouldEnableRackAwareAssignorForActiveWithDescribingTopics() {
         final MockInternalTopicManager spyTopicManager = 
spy(mockInternalTopicManager);
         doReturn(
             Collections.singletonMap(
-                TOPIC0,
+                TP_0_NAME,
                 Collections.singletonList(
-                    new TopicPartitionInfo(0, node0, Arrays.asList(replicas1), 
Collections.emptyList())
+                    new TopicPartitionInfo(0, NODE_0, 
Arrays.asList(REPLICA_1), Collections.emptyList())
                 )
             )
-        
).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0));
+        
).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TP_0_NAME));
 
         final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
             getClusterWithNoNode(),
-            getTaskTopicPartitionMapForTask1(),
+            getTaskTopicPartitionMapForTask0(),
             getTopologyGroupTaskMap(),
             getProcessRacksForProcess0(),
             spyTopicManager,
             new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
         );
 
-        assertTrue(assignor.canEnableRackAwareAssignorForActiveTasks());
+        assertTrue(assignor.canEnableRackAwareAssignor());
     }
 
     @Test
-    public void disableRackAwareAssignorForActiveWithDescribingTopicsFailure() 
{
+    public void 
shouldDisableRackAwareAssignorForActiveWithDescribingTopicsFailure() {
         final MockInternalTopicManager spyTopicManager = 
spy(mockInternalTopicManager);
-        doThrow(new TimeoutException("Timeout describing 
topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0));
+        doThrow(new TimeoutException("Timeout describing 
topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(
+            TP_0_NAME));
 
         final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
             getClusterWithNoNode(),
-            getTaskTopicPartitionMapForTask1(),
+            getTaskTopicPartitionMapForTask0(),
             getTopologyGroupTaskMap(),
             getProcessRacksForProcess0(),
             spyTopicManager,
             new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
         );
 
-        assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
+        assertFalse(assignor.canEnableRackAwareAssignor());
         assertTrue(assignor.populateTopicsToDiscribe(new HashSet<>()));
     }
 
+    @Test
+    public void shouldOptimizeEmptyActiveTasks() {
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            getClusterForTopic0And1(),
+            getTaskTopicPartitionMapForAllTasks(),
+            getTopologyGroupTaskMap(),
+            getProcessRacksForAllProcess(),
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        final ClientState clientState0 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1);
+
+        clientState0.assignActiveTasks(mkSet(TASK_0_1, TASK_1_1));
+
+        final SortedMap<UUID, ClientState> clientStateMap = new 
TreeMap<>(mkMap(
+            mkEntry(UUID_1, clientState0)
+        ));
+        final SortedSet<TaskId> taskIds = mkSortedSet();

Review Comment:
   This is to test nothing is changed if empty set is passed in



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -145,147 +188,517 @@ public void disableActiveSinceRackMissingInClient() {
 
         // False since process1 doesn't have rackId
         assertFalse(assignor.validateClientRack());
-        assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
+        assertFalse(assignor.canEnableRackAwareAssignor());
     }
 
     @Test
-    public void disableActiveSinceRackDiffersInSameProcess() {
+    public void shouldDisableActiveWhenRackDiffersInSameProcess() {
         final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
 
         // Different consumers in same process have different rack ID. This 
shouldn't happen.
         // If happens, there's a bug somewhere
-        processRacks.computeIfAbsent(process0UUID, k -> new 
HashMap<>()).put("consumer1", Optional.of("rack1"));
-        processRacks.computeIfAbsent(process0UUID, k -> new 
HashMap<>()).put("consumer2", Optional.of("rack2"));
+        processRacks.computeIfAbsent(UUID_1, k -> new 
HashMap<>()).put("consumer1", Optional.of("rack1"));
+        processRacks.computeIfAbsent(UUID_1, k -> new 
HashMap<>()).put("consumer2", Optional.of("rack2"));
 
         final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
             getClusterForTopic0(),
-            getTaskTopicPartitionMapForTask1(),
+            getTaskTopicPartitionMapForTask0(),
             getTopologyGroupTaskMap(),
             processRacks,
             mockInternalTopicManager,
             new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
         );
 
         assertFalse(assignor.validateClientRack());
-        assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
+        assertFalse(assignor.canEnableRackAwareAssignor());
     }
 
     @Test
-    public void enableRackAwareAssignorForActiveWithoutDescribingTopics() {
+    public void 
shouldEnableRackAwareAssignorForActiveWithoutDescribingTopics() {
         final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
             getClusterForTopic0(),
-            getTaskTopicPartitionMapForTask1(),
+            getTaskTopicPartitionMapForTask0(),
             getTopologyGroupTaskMap(),
             getProcessRacksForProcess0(),
             mockInternalTopicManager,
             new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
         );
 
         // partitionWithoutInfo00 has rackInfo in cluster metadata
-        assertTrue(assignor.canEnableRackAwareAssignorForActiveTasks());
+        assertTrue(assignor.canEnableRackAwareAssignor());
     }
 
     @Test
-    public void enableRackAwareAssignorForActiveWithDescribingTopics() {
+    public void shouldEnableRackAwareAssignorForActiveWithDescribingTopics() {
         final MockInternalTopicManager spyTopicManager = 
spy(mockInternalTopicManager);
         doReturn(
             Collections.singletonMap(
-                TOPIC0,
+                TP_0_NAME,
                 Collections.singletonList(
-                    new TopicPartitionInfo(0, node0, Arrays.asList(replicas1), 
Collections.emptyList())
+                    new TopicPartitionInfo(0, NODE_0, 
Arrays.asList(REPLICA_1), Collections.emptyList())
                 )
             )
-        
).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0));
+        
).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TP_0_NAME));
 
         final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
             getClusterWithNoNode(),
-            getTaskTopicPartitionMapForTask1(),
+            getTaskTopicPartitionMapForTask0(),
             getTopologyGroupTaskMap(),
             getProcessRacksForProcess0(),
             spyTopicManager,
             new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
         );
 
-        assertTrue(assignor.canEnableRackAwareAssignorForActiveTasks());
+        assertTrue(assignor.canEnableRackAwareAssignor());
     }
 
     @Test
-    public void disableRackAwareAssignorForActiveWithDescribingTopicsFailure() 
{
+    public void 
shouldDisableRackAwareAssignorForActiveWithDescribingTopicsFailure() {
         final MockInternalTopicManager spyTopicManager = 
spy(mockInternalTopicManager);
-        doThrow(new TimeoutException("Timeout describing 
topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0));
+        doThrow(new TimeoutException("Timeout describing 
topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(
+            TP_0_NAME));
 
         final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
             getClusterWithNoNode(),
-            getTaskTopicPartitionMapForTask1(),
+            getTaskTopicPartitionMapForTask0(),
             getTopologyGroupTaskMap(),
             getProcessRacksForProcess0(),
             spyTopicManager,
             new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
         );
 
-        assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
+        assertFalse(assignor.canEnableRackAwareAssignor());
         assertTrue(assignor.populateTopicsToDiscribe(new HashSet<>()));
     }
 
+    @Test
+    public void shouldOptimizeEmptyActiveTasks() {
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            getClusterForTopic0And1(),
+            getTaskTopicPartitionMapForAllTasks(),
+            getTopologyGroupTaskMap(),
+            getProcessRacksForAllProcess(),
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        final ClientState clientState0 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1);
+
+        clientState0.assignActiveTasks(mkSet(TASK_0_1, TASK_1_1));
+
+        final SortedMap<UUID, ClientState> clientStateMap = new 
TreeMap<>(mkMap(
+            mkEntry(UUID_1, clientState0)
+        ));
+        final SortedSet<TaskId> taskIds = mkSortedSet();
+
+        if (assignor.canEnableRackAwareAssignor()) {
+            final long originalCost = assignor.activeTasksCost(clientStateMap, 
taskIds, stateful);
+            assertEquals(0, originalCost);
+
+            final long cost = assignor.optimizeActiveTasks(clientStateMap, 
taskIds, stateful);
+            assertEquals(0, cost);
+        }
+
+        assertEquals(mkSet(TASK_0_1, TASK_1_1), clientState0.activeTasks());
+    }
+
+    @Test
+    public void shouldOptimizeActiveTasks() {
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            getClusterForTopic0And1(),
+            getTaskTopicPartitionMapForAllTasks(),
+            getTopologyGroupTaskMap(),
+            getProcessRacksForAllProcess(),
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        final ClientState clientState0 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1);
+        final ClientState clientState1 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1);
+        final ClientState clientState2 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1);
+
+        clientState0.assignActiveTasks(mkSet(TASK_0_1, TASK_1_1));
+        clientState1.assignActive(TASK_1_0);
+        clientState2.assignActive(TASK_0_0);
+
+        // task_0_0 has same rack as UUID_1
+        // task_0_1 has same rack as UUID_2 and UUID_3
+        // task_1_0 has same rack as UUID_1 and UUID_3
+        // task_1_1 has same rack as UUID_2
+        // Optimal assignment is UUID_1: {0_0, 1_0}, UUID_2: {1_1}, UUID_3: 
{0_1} which result in no cross rack traffic
+        final SortedMap<UUID, ClientState> clientStateMap = new 
TreeMap<>(mkMap(
+            mkEntry(UUID_1, clientState0),
+            mkEntry(UUID_2, clientState1),
+            mkEntry(UUID_3, clientState2)
+        ));
+        final SortedSet<TaskId> taskIds = mkSortedSet(TASK_0_0, TASK_0_1, 
TASK_1_0, TASK_1_1);
+
+        if (assignor.canEnableRackAwareAssignor()) {
+            int expected = stateful ? 40 : 4;
+            final long originalCost = assignor.activeTasksCost(clientStateMap, 
taskIds, stateful);
+            assertEquals(expected, originalCost);
+
+            expected = stateful ? 4 : 0;
+            final long cost = assignor.optimizeActiveTasks(clientStateMap, 
taskIds, stateful);
+            assertEquals(expected, cost);
+        }
+
+        assertEquals(mkSet(TASK_0_0, TASK_1_0), clientState0.activeTasks());
+        assertEquals(mkSet(TASK_1_1), clientState1.activeTasks());
+        assertEquals(mkSet(TASK_0_1), clientState2.activeTasks());
+    }
+
+    @Test
+    public void shouldOptimizeActiveTasksWithWeightOverride() {
+        final AssignmentConfigs assignmentConfigs = new AssignmentConfigs(1L, 
2, 2, 60000L, Collections.emptyList(), 1, 10);
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            getClusterForTopic0And1(),
+            getTaskTopicPartitionMapForAllTasks(),
+            getTopologyGroupTaskMap(),
+            getProcessRacksForAllProcess(),
+            mockInternalTopicManager,
+            assignmentConfigs
+        );
+
+        final ClientState clientState0 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1);
+        final ClientState clientState1 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1);
+        final ClientState clientState2 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1);
+
+        clientState0.assignActiveTasks(mkSet(TASK_0_1, TASK_1_1));
+        clientState1.assignActive(TASK_1_0);
+        clientState2.assignActive(TASK_0_0);
+
+        final SortedMap<UUID, ClientState> clientStateMap = new 
TreeMap<>(mkMap(
+            mkEntry(UUID_1, clientState0),
+            mkEntry(UUID_2, clientState1),
+            mkEntry(UUID_3, clientState2)
+        ));
+        final SortedSet<TaskId> taskIds = mkSortedSet(TASK_0_0, TASK_0_1, 
TASK_1_0, TASK_1_1);
+
+        // Because non_overlap_cost is very high, this basically will stick to 
original assignment
+        if (assignor.canEnableRackAwareAssignor()) {
+            final long originalCost = assignor.activeTasksCost(clientStateMap, 
taskIds, stateful);
+            assertEquals(4, originalCost);

Review Comment:
   In this test, `trafficCost` is 1 and `nonOverlapCost` is 10. So this cost 4 
is the trafficCost. nonOverlapCost in original assignment should be 0 since 
tasks haven't been moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to