lihaosky commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1275599868
##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##
@@ -313,20 +315,145 @@ public void
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
);
}
+@Test
+public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+final RackAwareTaskAssignor rackAwareTaskAssignor =
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+final Map racksForProcess = mkMap(
+mkEntry(UUID_1, "rack1"),
+mkEntry(UUID_2, "rack2"),
+mkEntry(UUID_3, "rack3"),
+mkEntry(UUID_4, "rack1"),
+mkEntry(UUID_5, "rack2"),
+mkEntry(UUID_6, "rack3"),
+mkEntry(UUID_7, "rack1"),
+mkEntry(UUID_8, "rack2"),
+mkEntry(UUID_9, "rack3")
+);
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+standbyTaskAssignor =
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(),
TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(),
TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(),
TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+);
+
+final Map clientStatesWithTags = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_3,
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_3
+);
+
+final Set allActiveTasks = findAllActiveTasks(clientStates);
+
+standbyTaskAssignor.assign(clientStates, allActiveTasks,
allActiveTasks, assignmentConfigs);
+
+final AssignmentConfigs assignmentConfigsWithTags =
newAssignmentConfigs(2, ZONE_TAG);
+standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
+standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks,
allActiveTasks, assignmentConfigsWithTags);
+
+Stream.of(clientStates, clientStatesWithTags).forEach(
+cs -> {
+
assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity));
+Stream.of(UUID_1, UUID_2, UUID_3)
+.forEach(client ->
assertStandbyTaskCountForClientEqualsTo(cs, client, 0));
+Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9)
+.forEach(client ->
assertStandbyTaskCountForClientEqualsTo(cs, client, 2));
+assertTotalNumberOfStandbyTasksEqualsTo(cs, 12);
+
+assertTrue(
+standbyClientsHonorRackAwareness(
+TASK_0_0,
+cs,
+singletonList(
+mkSet(UUID_6, UUID_8)
Review Comment:
`TASK_0_0`'s active task is in `UUID_1` which has `ZONE_1`. So its standby
can be put to `ZONE_2` or `ZONE_3` which are `UUID_2, 5, 6, 8, 9`. The
algorithm eventually picked 6 and 8. It's same other other tasks as well. They
are all put to process with different zone from active tasks.
##