[GitHub] [kafka] lihaosky commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


lihaosky commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277119690


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcessConsumer = racksForProcessConsumer;
 this.internalTopicManager = internalTopicManager;
 this.assignmentConfigs = assignmentConfigs;
 this.racksForPartition = new HashMap<>();
 this.racksForProcess = new HashMap<>();
+validateClientRack(racksForProcessConsumer);

Review Comment:
   Yeah. Reverted and introduced `validClientRack` to track it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-26 Thread via GitHub


lihaosky commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1275599868


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
 );
 }
 
+@Test
+public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+final Map racksForProcess = mkMap(
+mkEntry(UUID_1, "rack1"),
+mkEntry(UUID_2, "rack2"),
+mkEntry(UUID_3, "rack3"),
+mkEntry(UUID_4, "rack1"),
+mkEntry(UUID_5, "rack2"),
+mkEntry(UUID_6, "rack3"),
+mkEntry(UUID_7, "rack1"),
+mkEntry(UUID_8, "rack2"),
+mkEntry(UUID_9, "rack3")
+);
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), 
TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), 
TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), 
TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+);
+
+final Map clientStatesWithTags = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3,
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3
+);
+
+final Set allActiveTasks = findAllActiveTasks(clientStates);
+
+standbyTaskAssignor.assign(clientStates, allActiveTasks, 
allActiveTasks, assignmentConfigs);
+
+final AssignmentConfigs assignmentConfigsWithTags = 
newAssignmentConfigs(2, ZONE_TAG);
+standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
+standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, 
allActiveTasks, assignmentConfigsWithTags);
+
+Stream.of(clientStates, clientStatesWithTags).forEach(
+cs -> {
+
assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity));
+Stream.of(UUID_1, UUID_2, UUID_3)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 0));
+Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 2));
+assertTotalNumberOfStandbyTasksEqualsTo(cs, 12);
+
+assertTrue(
+standbyClientsHonorRackAwareness(
+TASK_0_0,
+cs,
+singletonList(
+mkSet(UUID_6, UUID_8)

Review Comment:
   `TASK_0_0`'s active task is in `UUID_1` which has `ZONE_1`. So its standby 
can be put to `ZONE_2` or `ZONE_3` which are `UUID_2, 5, 6, 8, 9`. The 
algorithm eventually picked 6 and 8. It's same other other tasks as well. They 
are all put to process with different zone from active tasks.



##

[GitHub] [kafka] lihaosky commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-26 Thread via GitHub


lihaosky commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1275577058


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -48,23 +49,25 @@ public class RackAwareTaskAssignor {
 
 private final Cluster fullMetadata;
 private final Map> partitionsForTask;
-private final Map>> racksForProcess;
+private final Map>> 
racksForProcessConsumer;
 private final AssignmentConfigs assignmentConfigs;
 private final Map> racksForPartition;
+private final Map racksForProcess;
 private final InternalTopicManager internalTopicManager;
 
 public RackAwareTaskAssignor(final Cluster fullMetadata,
  final Map> 
partitionsForTask,
  final Map> 
tasksForTopicGroup,
- final Map>> racksForProcess,
+ final Map>> racksForProcessConsumer,
  final InternalTopicManager 
internalTopicManager,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcess = racksForProcess;
+this.racksForProcessConsumer = racksForProcessConsumer;

Review Comment:
   Sounds good. We can check it here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org