lihaosky commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1275599868
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java: ########## @@ -313,20 +315,145 @@ public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { ); } + @Test + public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() { + final RackAwareTaskAssignor rackAwareTaskAssignor = mock(RackAwareTaskAssignor.class); + when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true); + final Map<UUID, String> racksForProcess = mkMap( + mkEntry(UUID_1, "rack1"), + mkEntry(UUID_2, "rack2"), + mkEntry(UUID_3, "rack3"), + mkEntry(UUID_4, "rack1"), + mkEntry(UUID_5, "rack2"), + mkEntry(UUID_6, "rack3"), + mkEntry(UUID_7, "rack1"), + mkEntry(UUID_8, "rack2"), + mkEntry(UUID_9, "rack3") + ); + when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess); + final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2); + standbyTaskAssignor = StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor); + verify(rackAwareTaskAssignor, times(1)).racksForProcess(); + + final Map<UUID, ClientState> clientStates = mkMap( + mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), TASK_0_0, TASK_1_0)), + mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), TASK_0_1, TASK_1_1)), + mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), TASK_0_2, TASK_1_2)), + + mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())), + mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())), + mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())), + + mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())), + mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())), + mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap())) + ); + + final Map<UUID, ClientState> clientStatesWithTags = mkMap( + mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)), + mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)), + mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)), + + mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)))), + mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)))), + mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)))), + + mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)))), + mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)))), + mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)))) + ); + + final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates); + + standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs); + + final AssignmentConfigs assignmentConfigsWithTags = newAssignmentConfigs(2, ZONE_TAG); + standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); + standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, allActiveTasks, assignmentConfigsWithTags); + + Stream.of(clientStates, clientStatesWithTags).forEach( + cs -> { + assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity)); + Stream.of(UUID_1, UUID_2, UUID_3) + .forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 0)); + Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9) + .forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 2)); + assertTotalNumberOfStandbyTasksEqualsTo(cs, 12); + + assertTrue( + standbyClientsHonorRackAwareness( + TASK_0_0, + cs, + singletonList( + mkSet(UUID_6, UUID_8) Review Comment: `TASK_0_0`'s active task is in `UUID_1` which has `ZONE_1`. So its standby can be put to `ZONE_2` or `ZONE_3` which are `UUID_2, 5, 6, 8, 9`. The algorithm eventually picked 6 and 8. It's same other other tasks as well. They are all put to process with different zone from active tasks. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java: ########## @@ -313,20 +315,145 @@ public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { ); } + @Test + public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() { + final RackAwareTaskAssignor rackAwareTaskAssignor = mock(RackAwareTaskAssignor.class); + when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true); + final Map<UUID, String> racksForProcess = mkMap( + mkEntry(UUID_1, "rack1"), + mkEntry(UUID_2, "rack2"), + mkEntry(UUID_3, "rack3"), + mkEntry(UUID_4, "rack1"), + mkEntry(UUID_5, "rack2"), + mkEntry(UUID_6, "rack3"), + mkEntry(UUID_7, "rack1"), + mkEntry(UUID_8, "rack2"), + mkEntry(UUID_9, "rack3") + ); + when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess); + final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2); + standbyTaskAssignor = StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor); + verify(rackAwareTaskAssignor, times(1)).racksForProcess(); + + final Map<UUID, ClientState> clientStates = mkMap( + mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), TASK_0_0, TASK_1_0)), + mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), TASK_0_1, TASK_1_1)), + mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), TASK_0_2, TASK_1_2)), + + mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())), + mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())), + mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())), + + mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())), + mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())), + mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap())) + ); + + final Map<UUID, ClientState> clientStatesWithTags = mkMap( + mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)), + mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)), + mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)), + + mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)))), + mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)))), + mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)))), + + mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)))), + mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)))), + mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)))) + ); + + final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates); + + standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs); + + final AssignmentConfigs assignmentConfigsWithTags = newAssignmentConfigs(2, ZONE_TAG); + standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); Review Comment: The second assignor uses `ZONE_TAG` from `clientTags()`. Its corresponding processIds has same mapping of zones compared to racks. Therefore, the first assignor we created from rack and the second assignor we created from zone should result in same eventual assignment. Below verifies that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org