lihaosky commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1275599868
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##########
@@ -313,20 +315,145 @@ public void
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
);
}
+ @Test
+ public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+ final RackAwareTaskAssignor rackAwareTaskAssignor =
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+ final Map<UUID, String> racksForProcess = mkMap(
+ mkEntry(UUID_1, "rack1"),
+ mkEntry(UUID_2, "rack2"),
+ mkEntry(UUID_3, "rack3"),
+ mkEntry(UUID_4, "rack1"),
+ mkEntry(UUID_5, "rack2"),
+ mkEntry(UUID_6, "rack3"),
+ mkEntry(UUID_7, "rack1"),
+ mkEntry(UUID_8, "rack2"),
+ mkEntry(UUID_9, "rack3")
+ );
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+ final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+ standbyTaskAssignor =
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+ verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+ final Map<UUID, ClientState> clientStates = mkMap(
+ mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(),
TASK_0_0, TASK_1_0)),
+ mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(),
TASK_0_1, TASK_1_1)),
+ mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(),
TASK_0_2, TASK_1_2)),
+
+ mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+ mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+ mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+ mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+ mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+ mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+ );
+
+ final Map<UUID, ClientState> clientStatesWithTags = mkMap(
+ mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+ mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+ mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+ mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_1)))),
+ mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_2)))),
+ mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_3)))),
+
+ mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_1)))),
+ mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_2)))),
+ mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_3))))
+ );
+
+ final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
+
+ standbyTaskAssignor.assign(clientStates, allActiveTasks,
allActiveTasks, assignmentConfigs);
+
+ final AssignmentConfigs assignmentConfigsWithTags =
newAssignmentConfigs(2, ZONE_TAG);
+ standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
+ standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks,
allActiveTasks, assignmentConfigsWithTags);
+
+ Stream.of(clientStates, clientStatesWithTags).forEach(
+ cs -> {
+
assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity));
+ Stream.of(UUID_1, UUID_2, UUID_3)
+ .forEach(client ->
assertStandbyTaskCountForClientEqualsTo(cs, client, 0));
+ Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9)
+ .forEach(client ->
assertStandbyTaskCountForClientEqualsTo(cs, client, 2));
+ assertTotalNumberOfStandbyTasksEqualsTo(cs, 12);
+
+ assertTrue(
+ standbyClientsHonorRackAwareness(
+ TASK_0_0,
+ cs,
+ singletonList(
+ mkSet(UUID_6, UUID_8)
Review Comment:
`TASK_0_0`'s active task is in `UUID_1` which has `ZONE_1`. So its standby
can be put to `ZONE_2` or `ZONE_3` which are `UUID_2, 5, 6, 8, 9`. The
algorithm eventually picked 6 and 8. It's same other other tasks as well. They
are all put to process with different zone from active tasks.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##########
@@ -313,20 +315,145 @@ public void
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
);
}
+ @Test
+ public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+ final RackAwareTaskAssignor rackAwareTaskAssignor =
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+ final Map<UUID, String> racksForProcess = mkMap(
+ mkEntry(UUID_1, "rack1"),
+ mkEntry(UUID_2, "rack2"),
+ mkEntry(UUID_3, "rack3"),
+ mkEntry(UUID_4, "rack1"),
+ mkEntry(UUID_5, "rack2"),
+ mkEntry(UUID_6, "rack3"),
+ mkEntry(UUID_7, "rack1"),
+ mkEntry(UUID_8, "rack2"),
+ mkEntry(UUID_9, "rack3")
+ );
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+ final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+ standbyTaskAssignor =
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+ verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+ final Map<UUID, ClientState> clientStates = mkMap(
+ mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(),
TASK_0_0, TASK_1_0)),
+ mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(),
TASK_0_1, TASK_1_1)),
+ mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(),
TASK_0_2, TASK_1_2)),
+
+ mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+ mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+ mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+ mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+ mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+ mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+ );
+
+ final Map<UUID, ClientState> clientStatesWithTags = mkMap(
+ mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+ mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+ mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+ mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_1)))),
+ mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_2)))),
+ mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_3)))),
+
+ mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_1)))),
+ mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_2)))),
+ mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2,
mkMap(mkEntry(ZONE_TAG, ZONE_3))))
+ );
+
+ final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
+
+ standbyTaskAssignor.assign(clientStates, allActiveTasks,
allActiveTasks, assignmentConfigs);
+
+ final AssignmentConfigs assignmentConfigsWithTags =
newAssignmentConfigs(2, ZONE_TAG);
+ standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
Review Comment:
The second assignor uses `ZONE_TAG` from `clientTags()`. Its corresponding
processIds has same mapping of zones compared to racks. Therefore, the first
assignor we created from rack and the second assignor we created from zone
should result in same eventual assignment. Below verifies that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]