[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java: ## @@ -313,20 +315,145 @@ public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { ); } +@Test +public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() { +final RackAwareTaskAssignor rackAwareTaskAssignor = mock(RackAwareTaskAssignor.class); + when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true); +final Map racksForProcess = mkMap( +mkEntry(UUID_1, "rack1"), +mkEntry(UUID_2, "rack2"), +mkEntry(UUID_3, "rack3"), +mkEntry(UUID_4, "rack1"), +mkEntry(UUID_5, "rack2"), +mkEntry(UUID_6, "rack3"), +mkEntry(UUID_7, "rack1"), +mkEntry(UUID_8, "rack2"), +mkEntry(UUID_9, "rack3") +); + when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess); +final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2); +standbyTaskAssignor = StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor); +verify(rackAwareTaskAssignor, times(1)).racksForProcess(); + +final Map clientStates = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())), +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())), +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())), + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())), +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())), +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap())) +); + +final Map clientStatesWithTags = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3, + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3 +); + +final Set allActiveTasks = findAllActiveTasks(clientStates); + +standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs); + +final AssignmentConfigs assignmentConfigsWithTags = newAssignmentConfigs(2, ZONE_TAG); +standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); +standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, allActiveTasks, assignmentConfigsWithTags); + +Stream.of(clientStates, clientStatesWithTags).forEach( +cs -> { + assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity)); +Stream.of(UUID_1, UUID_2, UUID_3) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 0)); +Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 2)); +assertTotalNumberOfStandbyTasksEqualsTo(cs, 12); + +assertTrue( +standbyClientsHonorRackAwareness( +TASK_0_0, +cs, +singletonList( +mkSet(UUID_6, UUID_8) Review Comment: Thanks. So my understanding was actually correct. Follow up question: should we keep this test tight as it is right now, or should we actually say, we only verify that the algorithm does not pick anything in zone_1 (ie, not `UUID_1, 4, 7`), and thus, we pass in all _valid_ UUID form all different zones instead, ie, we pass in `UUID_2, 5, 6, 8, 9` ?
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java: ## @@ -313,20 +315,145 @@ public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { ); } +@Test +public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() { +final RackAwareTaskAssignor rackAwareTaskAssignor = mock(RackAwareTaskAssignor.class); + when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true); +final Map racksForProcess = mkMap( +mkEntry(UUID_1, "rack1"), +mkEntry(UUID_2, "rack2"), +mkEntry(UUID_3, "rack3"), +mkEntry(UUID_4, "rack1"), +mkEntry(UUID_5, "rack2"), +mkEntry(UUID_6, "rack3"), +mkEntry(UUID_7, "rack1"), +mkEntry(UUID_8, "rack2"), +mkEntry(UUID_9, "rack3") +); + when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess); +final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2); +standbyTaskAssignor = StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor); +verify(rackAwareTaskAssignor, times(1)).racksForProcess(); + +final Map clientStates = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())), +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())), +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())), + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())), +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())), +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap())) +); + +final Map clientStatesWithTags = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3, + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3 +); + +final Set allActiveTasks = findAllActiveTasks(clientStates); + +standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs); + +final AssignmentConfigs assignmentConfigsWithTags = newAssignmentConfigs(2, ZONE_TAG); +standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); +standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, allActiveTasks, assignmentConfigsWithTags); + +Stream.of(clientStates, clientStatesWithTags).forEach( +cs -> { + assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity)); +Stream.of(UUID_1, UUID_2, UUID_3) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 0)); +Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 2)); +assertTotalNumberOfStandbyTasksEqualsTo(cs, 12); + +assertTrue( +standbyClientsHonorRackAwareness( +TASK_0_0, +cs, +singletonList( +mkSet(UUID_6, UUID_8) Review Comment: Thanks. So my understanding was actually correct. Follow up question: should we keep this test tight as it is right now, or should we actually say, we only verify that the algorithm does not pick anything in zone_1 (ie, not `UUID_1, 4, 7`), and thus, we pass in all _valid_ UUID form all different zones instead, ie, we pass in `UUID_2, 5, 6, 8, 9` ? --
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java: ## @@ -313,20 +315,145 @@ public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { ); } +@Test +public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() { +final RackAwareTaskAssignor rackAwareTaskAssignor = mock(RackAwareTaskAssignor.class); + when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true); +final Map racksForProcess = mkMap( +mkEntry(UUID_1, "rack1"), +mkEntry(UUID_2, "rack2"), +mkEntry(UUID_3, "rack3"), +mkEntry(UUID_4, "rack1"), +mkEntry(UUID_5, "rack2"), +mkEntry(UUID_6, "rack3"), +mkEntry(UUID_7, "rack1"), +mkEntry(UUID_8, "rack2"), +mkEntry(UUID_9, "rack3") +); + when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess); +final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2); +standbyTaskAssignor = StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor); +verify(rackAwareTaskAssignor, times(1)).racksForProcess(); + +final Map clientStates = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())), +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())), +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())), + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())), +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())), +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap())) +); + +final Map clientStatesWithTags = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3, + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3 +); + +final Set allActiveTasks = findAllActiveTasks(clientStates); + +standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs); + +final AssignmentConfigs assignmentConfigsWithTags = newAssignmentConfigs(2, ZONE_TAG); +standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); +standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, allActiveTasks, assignmentConfigsWithTags); + +Stream.of(clientStates, clientStatesWithTags).forEach( +cs -> { + assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity)); +Stream.of(UUID_1, UUID_2, UUID_3) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 0)); +Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 2)); +assertTotalNumberOfStandbyTasksEqualsTo(cs, 12); + +assertTrue( +standbyClientsHonorRackAwareness( +TASK_0_0, +cs, +singletonList( +mkSet(UUID_6, UUID_8) Review Comment: > The algorithm eventually picked 6 and 8. Ah. So my understanding was actually correct. Follow up question: should we make this test tight as it is right now, or should we actually say, we only verify that the algorithm does not pick anything in zone_1, and thus, we pass in all _valid_ UUID form different zones instead, ie, we pass in `UUID_2,
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata, final AssignmentConfigs assignmentConfigs) { this.fullMetadata = fullMetadata; this.partitionsForTask = partitionsForTask; -this.racksForProcessConsumer = racksForProcessConsumer; this.internalTopicManager = internalTopicManager; this.assignmentConfigs = assignmentConfigs; this.racksForPartition = new HashMap<>(); this.racksForProcess = new HashMap<>(); +validateClientRack(racksForProcessConsumer); Review Comment: I think we should document that we might throw here, because when we create ` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill the `StreamThread`? Now that I am realizing this, I am actually wondering if it's a good idea to handle it this way? In the end, for the callee it might be better to figure out if the `RackAwareTaskAssignor` can we used or not via method call before we create it? Maybe we nee some `static` method into which we pass `racksForProcessConsumer`? -- Any other idea to make it more natural for the callee? If we think handling the exception is fine for the callee, also ok with me. Just asking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata, final AssignmentConfigs assignmentConfigs) { this.fullMetadata = fullMetadata; this.partitionsForTask = partitionsForTask; -this.racksForProcessConsumer = racksForProcessConsumer; this.internalTopicManager = internalTopicManager; this.assignmentConfigs = assignmentConfigs; this.racksForPartition = new HashMap<>(); this.racksForProcess = new HashMap<>(); +validateClientRack(racksForProcessConsumer); Review Comment: I think we should document that we might throw here, because when we create ` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill the `StreamThread`? Now that I am realizing this, I am actually wondering if it's a good idea to handle it this way? In the end, for the callee it might be better to figure out if the `RackAwareTaskAssignor` can we used or not via method call before we create it? Maybe we nee some `static` method into which we pass `racksForProcessConsumer`? -- Any other idea to make it more natural for the callee? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata, final AssignmentConfigs assignmentConfigs) { this.fullMetadata = fullMetadata; this.partitionsForTask = partitionsForTask; -this.racksForProcessConsumer = racksForProcessConsumer; this.internalTopicManager = internalTopicManager; this.assignmentConfigs = assignmentConfigs; this.racksForPartition = new HashMap<>(); this.racksForProcess = new HashMap<>(); +validateClientRack(racksForProcessConsumer); Review Comment: I think we should document that we might throw here, because when we create ` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill the `StreamThread`? Now that I am realizing this, I am actually wondering if it's a good idea to handle it this way? In the end, for the callee it might be better to figure out if the `RackAwareTaskAssignor` can we used or not via method call before we create it? Maybe we nee some `static` method? -- Any other idea to make it more natural for the callee? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277014726 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata, final AssignmentConfigs assignmentConfigs) { this.fullMetadata = fullMetadata; this.partitionsForTask = partitionsForTask; -this.racksForProcessConsumer = racksForProcessConsumer; Review Comment: We can also remove `racksForProcessConsumer` from the parameter list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1275580851 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java: ## @@ -313,20 +315,145 @@ public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { ); } +@Test +public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() { +final RackAwareTaskAssignor rackAwareTaskAssignor = mock(RackAwareTaskAssignor.class); + when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true); +final Map racksForProcess = mkMap( +mkEntry(UUID_1, "rack1"), +mkEntry(UUID_2, "rack2"), +mkEntry(UUID_3, "rack3"), +mkEntry(UUID_4, "rack1"), +mkEntry(UUID_5, "rack2"), +mkEntry(UUID_6, "rack3"), +mkEntry(UUID_7, "rack1"), +mkEntry(UUID_8, "rack2"), +mkEntry(UUID_9, "rack3") +); + when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess); +final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2); +standbyTaskAssignor = StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor); +verify(rackAwareTaskAssignor, times(1)).racksForProcess(); + +final Map clientStates = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())), +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())), +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())), + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())), +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())), +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap())) +); + +final Map clientStatesWithTags = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3, + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3 +); + +final Set allActiveTasks = findAllActiveTasks(clientStates); + +standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs); + +final AssignmentConfigs assignmentConfigsWithTags = newAssignmentConfigs(2, ZONE_TAG); +standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); Review Comment: Why are we create a second task assignor (which won't use the `rackAwareTaskAssignor` but be the default? Do we put two different cases into a single test case, but it should be two test cases? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java: ## @@ -313,20 +315,145 @@ public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { ); } +@Test +public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() { +final RackAwareTaskAssignor rackAwareTaskAssignor = mock(RackAwareTaskAssignor.class); + when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true); +final Map racksForProcess = mkMap( +mkEntry(UUID_1, "rack1"), +mkEntry(UUID_2, "rack2"), +mkEntry(UUID_3, "rack3"), +mkEntry(UUID_4, "rack1"), +mkEntry(UUID_5, "rack2"), +mkEntry(UUID_6, "rack3"), +mkEntry(UUID_7, "rack1"), +mkEntry(UUID_8, "rack2"), +mkEntry(UUID_9, "rack3") +); + when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess); +final AssignmentConfigs assignmentConfigs =
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1275571720 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -48,23 +49,25 @@ public class RackAwareTaskAssignor { private final Cluster fullMetadata; private final Map> partitionsForTask; -private final Map>> racksForProcess; +private final Map>> racksForProcessConsumer; private final AssignmentConfigs assignmentConfigs; private final Map> racksForPartition; +private final Map racksForProcess; private final InternalTopicManager internalTopicManager; public RackAwareTaskAssignor(final Cluster fullMetadata, final Map> partitionsForTask, final Map> tasksForTopicGroup, - final Map>> racksForProcess, + final Map>> racksForProcessConsumer, final InternalTopicManager internalTopicManager, final AssignmentConfigs assignmentConfigs) { this.fullMetadata = fullMetadata; this.partitionsForTask = partitionsForTask; -this.racksForProcess = racksForProcess; +this.racksForProcessConsumer = racksForProcessConsumer; Review Comment: Looking into the code, it seems we store `racksForProcessConsumer` only to verify if the same `rack.id` is set for all consumers inside `validateClientRack()` -- thus, I am wondering if we should do this check right away in the constructor and not have it as a member variable to begin with? In the end, we only need the new `racksForProcess` only (if it's set). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org