[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
 );
 }
 
+@Test
+public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+final Map racksForProcess = mkMap(
+mkEntry(UUID_1, "rack1"),
+mkEntry(UUID_2, "rack2"),
+mkEntry(UUID_3, "rack3"),
+mkEntry(UUID_4, "rack1"),
+mkEntry(UUID_5, "rack2"),
+mkEntry(UUID_6, "rack3"),
+mkEntry(UUID_7, "rack1"),
+mkEntry(UUID_8, "rack2"),
+mkEntry(UUID_9, "rack3")
+);
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), 
TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), 
TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), 
TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+);
+
+final Map clientStatesWithTags = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3,
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3
+);
+
+final Set allActiveTasks = findAllActiveTasks(clientStates);
+
+standbyTaskAssignor.assign(clientStates, allActiveTasks, 
allActiveTasks, assignmentConfigs);
+
+final AssignmentConfigs assignmentConfigsWithTags = 
newAssignmentConfigs(2, ZONE_TAG);
+standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
+standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, 
allActiveTasks, assignmentConfigsWithTags);
+
+Stream.of(clientStates, clientStatesWithTags).forEach(
+cs -> {
+
assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity));
+Stream.of(UUID_1, UUID_2, UUID_3)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 0));
+Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 2));
+assertTotalNumberOfStandbyTasksEqualsTo(cs, 12);
+
+assertTrue(
+standbyClientsHonorRackAwareness(
+TASK_0_0,
+cs,
+singletonList(
+mkSet(UUID_6, UUID_8)

Review Comment:
   Thanks. So my understanding was actually correct.
   
   Follow up question: should we keep this test tight as it is right now, or 
should we actually say, we only verify that the algorithm does not pick 
anything in zone_1 (ie, not `UUID_1, 4, 7`), and thus, we pass in all _valid_ 
UUID form all different zones instead, ie, we pass in `UUID_2, 5, 6, 8, 9` ?
   
  

[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
 );
 }
 
+@Test
+public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+final Map racksForProcess = mkMap(
+mkEntry(UUID_1, "rack1"),
+mkEntry(UUID_2, "rack2"),
+mkEntry(UUID_3, "rack3"),
+mkEntry(UUID_4, "rack1"),
+mkEntry(UUID_5, "rack2"),
+mkEntry(UUID_6, "rack3"),
+mkEntry(UUID_7, "rack1"),
+mkEntry(UUID_8, "rack2"),
+mkEntry(UUID_9, "rack3")
+);
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), 
TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), 
TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), 
TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+);
+
+final Map clientStatesWithTags = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3,
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3
+);
+
+final Set allActiveTasks = findAllActiveTasks(clientStates);
+
+standbyTaskAssignor.assign(clientStates, allActiveTasks, 
allActiveTasks, assignmentConfigs);
+
+final AssignmentConfigs assignmentConfigsWithTags = 
newAssignmentConfigs(2, ZONE_TAG);
+standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
+standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, 
allActiveTasks, assignmentConfigsWithTags);
+
+Stream.of(clientStates, clientStatesWithTags).forEach(
+cs -> {
+
assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity));
+Stream.of(UUID_1, UUID_2, UUID_3)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 0));
+Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 2));
+assertTotalNumberOfStandbyTasksEqualsTo(cs, 12);
+
+assertTrue(
+standbyClientsHonorRackAwareness(
+TASK_0_0,
+cs,
+singletonList(
+mkSet(UUID_6, UUID_8)

Review Comment:
   Thanks. So my understanding was actually correct.
   
   Follow up question: should we keep this test tight as it is right now, or 
should we actually say, we only verify that the algorithm does not pick 
anything in zone_1 (ie, not `UUID_1, 4, 7`), and thus, we pass in all _valid_ 
UUID form all different zones instead, ie, we pass in `UUID_2, 5, 6, 8, 9` ?



-- 

[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
 );
 }
 
+@Test
+public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+final Map racksForProcess = mkMap(
+mkEntry(UUID_1, "rack1"),
+mkEntry(UUID_2, "rack2"),
+mkEntry(UUID_3, "rack3"),
+mkEntry(UUID_4, "rack1"),
+mkEntry(UUID_5, "rack2"),
+mkEntry(UUID_6, "rack3"),
+mkEntry(UUID_7, "rack1"),
+mkEntry(UUID_8, "rack2"),
+mkEntry(UUID_9, "rack3")
+);
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), 
TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), 
TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), 
TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+);
+
+final Map clientStatesWithTags = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3,
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3
+);
+
+final Set allActiveTasks = findAllActiveTasks(clientStates);
+
+standbyTaskAssignor.assign(clientStates, allActiveTasks, 
allActiveTasks, assignmentConfigs);
+
+final AssignmentConfigs assignmentConfigsWithTags = 
newAssignmentConfigs(2, ZONE_TAG);
+standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
+standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, 
allActiveTasks, assignmentConfigsWithTags);
+
+Stream.of(clientStates, clientStatesWithTags).forEach(
+cs -> {
+
assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity));
+Stream.of(UUID_1, UUID_2, UUID_3)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 0));
+Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 2));
+assertTotalNumberOfStandbyTasksEqualsTo(cs, 12);
+
+assertTrue(
+standbyClientsHonorRackAwareness(
+TASK_0_0,
+cs,
+singletonList(
+mkSet(UUID_6, UUID_8)

Review Comment:
   > The algorithm eventually picked 6 and 8. 
   
   Ah. So my understanding was actually correct.
   
   Follow up question: should we make this test tight as it is right now, or 
should we actually say, we only verify that the algorithm does not pick 
anything in zone_1, and thus, we pass in all _valid_ UUID form different zones 
instead, ie, we pass in `UUID_2, 

[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcessConsumer = racksForProcessConsumer;
 this.internalTopicManager = internalTopicManager;
 this.assignmentConfigs = assignmentConfigs;
 this.racksForPartition = new HashMap<>();
 this.racksForProcess = new HashMap<>();
+validateClientRack(racksForProcessConsumer);

Review Comment:
   I think we should document that we might throw here, because when we create 
` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill 
the `StreamThread`?
   
   Now that I am realizing this, I am actually wondering if it's a good idea to 
handle it this way? In the end, for the callee it might be better to figure out 
if the `RackAwareTaskAssignor` can we used or not via method call before we 
create it? Maybe we nee some `static` method into which we pass 
`racksForProcessConsumer`? -- Any other idea to make it more natural for the 
callee?
   
   If we think handling the exception is fine for the callee, also ok with me. 
Just asking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcessConsumer = racksForProcessConsumer;
 this.internalTopicManager = internalTopicManager;
 this.assignmentConfigs = assignmentConfigs;
 this.racksForPartition = new HashMap<>();
 this.racksForProcess = new HashMap<>();
+validateClientRack(racksForProcessConsumer);

Review Comment:
   I think we should document that we might throw here, because when we create 
` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill 
the `StreamThread`?
   
   Now that I am realizing this, I am actually wondering if it's a good idea to 
handle it this way? In the end, for the callee it might be better to figure out 
if the `RackAwareTaskAssignor` can we used or not via method call before we 
create it? Maybe we nee some `static` method into which we pass 
`racksForProcessConsumer`? -- Any other idea to make it more natural for the 
callee?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcessConsumer = racksForProcessConsumer;
 this.internalTopicManager = internalTopicManager;
 this.assignmentConfigs = assignmentConfigs;
 this.racksForPartition = new HashMap<>();
 this.racksForProcess = new HashMap<>();
+validateClientRack(racksForProcessConsumer);

Review Comment:
   I think we should document that we might throw here, because when we create 
` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill 
the `StreamThread`?
   
   Now that I am realizing this, I am actually wondering if it's a good idea to 
handle it this way? In the end, for the callee it might be better to figure out 
if the `RackAwareTaskAssignor` can we used or not via method call before we 
create it? Maybe we nee some `static` method? -- Any other idea to make it more 
natural for the callee?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277014726


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcessConsumer = racksForProcessConsumer;

Review Comment:
   We can also remove `racksForProcessConsumer` from the parameter list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-26 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1275580851


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
 );
 }
 
+@Test
+public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+final Map racksForProcess = mkMap(
+mkEntry(UUID_1, "rack1"),
+mkEntry(UUID_2, "rack2"),
+mkEntry(UUID_3, "rack3"),
+mkEntry(UUID_4, "rack1"),
+mkEntry(UUID_5, "rack2"),
+mkEntry(UUID_6, "rack3"),
+mkEntry(UUID_7, "rack1"),
+mkEntry(UUID_8, "rack2"),
+mkEntry(UUID_9, "rack3")
+);
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), 
TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), 
TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), 
TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+);
+
+final Map clientStatesWithTags = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3,
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3
+);
+
+final Set allActiveTasks = findAllActiveTasks(clientStates);
+
+standbyTaskAssignor.assign(clientStates, allActiveTasks, 
allActiveTasks, assignmentConfigs);
+
+final AssignmentConfigs assignmentConfigsWithTags = 
newAssignmentConfigs(2, ZONE_TAG);
+standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();

Review Comment:
   Why are we create a second task assignor (which won't use the 
`rackAwareTaskAssignor` but be the default? Do we put two different cases into 
a single test case, but it should be two test cases?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
 );
 }
 
+@Test
+public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+final Map racksForProcess = mkMap(
+mkEntry(UUID_1, "rack1"),
+mkEntry(UUID_2, "rack2"),
+mkEntry(UUID_3, "rack3"),
+mkEntry(UUID_4, "rack1"),
+mkEntry(UUID_5, "rack2"),
+mkEntry(UUID_6, "rack3"),
+mkEntry(UUID_7, "rack1"),
+mkEntry(UUID_8, "rack2"),
+mkEntry(UUID_9, "rack3")
+);
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+final AssignmentConfigs assignmentConfigs = 

[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-26 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1275571720


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -48,23 +49,25 @@ public class RackAwareTaskAssignor {
 
 private final Cluster fullMetadata;
 private final Map> partitionsForTask;
-private final Map>> racksForProcess;
+private final Map>> 
racksForProcessConsumer;
 private final AssignmentConfigs assignmentConfigs;
 private final Map> racksForPartition;
+private final Map racksForProcess;
 private final InternalTopicManager internalTopicManager;
 
 public RackAwareTaskAssignor(final Cluster fullMetadata,
  final Map> 
partitionsForTask,
  final Map> 
tasksForTopicGroup,
- final Map>> racksForProcess,
+ final Map>> racksForProcessConsumer,
  final InternalTopicManager 
internalTopicManager,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcess = racksForProcess;
+this.racksForProcessConsumer = racksForProcessConsumer;

Review Comment:
   Looking into the code, it seems we store `racksForProcessConsumer` only to 
verify if the same `rack.id` is set for all consumers inside 
`validateClientRack()` -- thus, I am wondering if we should do this check right 
away in the constructor and not have it as a member variable to begin with? In 
the end, we only need the new `racksForProcess` only (if it's set).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org