lihaosky commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1275599868


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##########
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
         );
     }
 
+    @Test
+    public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+        final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+        
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+        final Map<UUID, String> racksForProcess = mkMap(
+            mkEntry(UUID_1, "rack1"),
+            mkEntry(UUID_2, "rack2"),
+            mkEntry(UUID_3, "rack3"),
+            mkEntry(UUID_4, "rack1"),
+            mkEntry(UUID_5, "rack2"),
+            mkEntry(UUID_6, "rack3"),
+            mkEntry(UUID_7, "rack1"),
+            mkEntry(UUID_8, "rack2"),
+            mkEntry(UUID_9, "rack3")
+        );
+        
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+        final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+        standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+        verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+        final Map<UUID, ClientState> clientStates = mkMap(
+            mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), 
TASK_0_0, TASK_1_0)),
+            mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), 
TASK_0_1, TASK_1_1)),
+            mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), 
TASK_0_2, TASK_1_2)),
+
+            mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+            mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+            mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+            mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+            mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+            mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+        );
+
+        final Map<UUID, ClientState> clientStatesWithTags = mkMap(
+            mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+            mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+            mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+            mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)))),
+            mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)))),
+            mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)))),
+
+            mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)))),
+            mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)))),
+            mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3))))
+        );
+
+        final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
+
+        standbyTaskAssignor.assign(clientStates, allActiveTasks, 
allActiveTasks, assignmentConfigs);
+
+        final AssignmentConfigs assignmentConfigsWithTags = 
newAssignmentConfigs(2, ZONE_TAG);
+        standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
+        standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, 
allActiveTasks, assignmentConfigsWithTags);
+
+        Stream.of(clientStates, clientStatesWithTags).forEach(
+            cs -> {
+                
assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity));
+                Stream.of(UUID_1, UUID_2, UUID_3)
+                    .forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 0));
+                Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9)
+                    .forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 2));
+                assertTotalNumberOfStandbyTasksEqualsTo(cs, 12);
+
+                assertTrue(
+                    standbyClientsHonorRackAwareness(
+                        TASK_0_0,
+                        cs,
+                        singletonList(
+                            mkSet(UUID_6, UUID_8)

Review Comment:
   `TASK_0_0`'s active task is in `UUID_1` which has `ZONE_1`. So its standby 
can be put to `ZONE_2` or `ZONE_3` which are `UUID_2, 5, 6, 8, 9`. The 
algorithm eventually picked 6 and 8. It's same other other tasks as well. They 
are all put to process with different zone from active tasks.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##########
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
         );
     }
 
+    @Test
+    public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+        final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+        
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+        final Map<UUID, String> racksForProcess = mkMap(
+            mkEntry(UUID_1, "rack1"),
+            mkEntry(UUID_2, "rack2"),
+            mkEntry(UUID_3, "rack3"),
+            mkEntry(UUID_4, "rack1"),
+            mkEntry(UUID_5, "rack2"),
+            mkEntry(UUID_6, "rack3"),
+            mkEntry(UUID_7, "rack1"),
+            mkEntry(UUID_8, "rack2"),
+            mkEntry(UUID_9, "rack3")
+        );
+        
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+        final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+        standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+        verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+        final Map<UUID, ClientState> clientStates = mkMap(
+            mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), 
TASK_0_0, TASK_1_0)),
+            mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), 
TASK_0_1, TASK_1_1)),
+            mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), 
TASK_0_2, TASK_1_2)),
+
+            mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+            mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+            mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+            mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+            mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+            mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+        );
+
+        final Map<UUID, ClientState> clientStatesWithTags = mkMap(
+            mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+            mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+            mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+            mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)))),
+            mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)))),
+            mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)))),
+
+            mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)))),
+            mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)))),
+            mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3))))
+        );
+
+        final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
+
+        standbyTaskAssignor.assign(clientStates, allActiveTasks, 
allActiveTasks, assignmentConfigs);
+
+        final AssignmentConfigs assignmentConfigsWithTags = 
newAssignmentConfigs(2, ZONE_TAG);
+        standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();

Review Comment:
   The second assignor uses `ZONE_TAG` from `clientTags()`.  Its corresponding 
processIds has same mapping of zones compared to racks. Therefore, the first 
assignor we created from rack and the second assignor we created from zone 
should result in same eventual assignment. Below verifies that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to