lihaosky commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1272791951
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java: ########## @@ -145,147 +188,517 @@ public void disableActiveSinceRackMissingInClient() { // False since process1 doesn't have rackId assertFalse(assignor.validateClientRack()); - assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks()); + assertFalse(assignor.canEnableRackAwareAssignor()); } @Test - public void disableActiveSinceRackDiffersInSameProcess() { + public void shouldDisableActiveWhenRackDiffersInSameProcess() { final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>(); // Different consumers in same process have different rack ID. This shouldn't happen. // If happens, there's a bug somewhere - processRacks.computeIfAbsent(process0UUID, k -> new HashMap<>()).put("consumer1", Optional.of("rack1")); - processRacks.computeIfAbsent(process0UUID, k -> new HashMap<>()).put("consumer2", Optional.of("rack2")); + processRacks.computeIfAbsent(UUID_1, k -> new HashMap<>()).put("consumer1", Optional.of("rack1")); + processRacks.computeIfAbsent(UUID_1, k -> new HashMap<>()).put("consumer2", Optional.of("rack2")); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( getClusterForTopic0(), - getTaskTopicPartitionMapForTask1(), + getTaskTopicPartitionMapForTask0(), getTopologyGroupTaskMap(), processRacks, mockInternalTopicManager, new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() ); assertFalse(assignor.validateClientRack()); - assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks()); + assertFalse(assignor.canEnableRackAwareAssignor()); } @Test - public void enableRackAwareAssignorForActiveWithoutDescribingTopics() { + public void shouldEnableRackAwareAssignorForActiveWithoutDescribingTopics() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( getClusterForTopic0(), - getTaskTopicPartitionMapForTask1(), + getTaskTopicPartitionMapForTask0(), getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() ); // partitionWithoutInfo00 has rackInfo in cluster metadata - assertTrue(assignor.canEnableRackAwareAssignorForActiveTasks()); + assertTrue(assignor.canEnableRackAwareAssignor()); } @Test - public void enableRackAwareAssignorForActiveWithDescribingTopics() { + public void shouldEnableRackAwareAssignorForActiveWithDescribingTopics() { final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager); doReturn( Collections.singletonMap( - TOPIC0, + TP_0_NAME, Collections.singletonList( - new TopicPartitionInfo(0, node0, Arrays.asList(replicas1), Collections.emptyList()) + new TopicPartitionInfo(0, NODE_0, Arrays.asList(REPLICA_1), Collections.emptyList()) ) ) - ).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0)); + ).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TP_0_NAME)); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( getClusterWithNoNode(), - getTaskTopicPartitionMapForTask1(), + getTaskTopicPartitionMapForTask0(), getTopologyGroupTaskMap(), getProcessRacksForProcess0(), spyTopicManager, new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() ); - assertTrue(assignor.canEnableRackAwareAssignorForActiveTasks()); + assertTrue(assignor.canEnableRackAwareAssignor()); } @Test - public void disableRackAwareAssignorForActiveWithDescribingTopicsFailure() { + public void shouldDisableRackAwareAssignorForActiveWithDescribingTopicsFailure() { final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager); - doThrow(new TimeoutException("Timeout describing topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0)); + doThrow(new TimeoutException("Timeout describing topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton( + TP_0_NAME)); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( getClusterWithNoNode(), - getTaskTopicPartitionMapForTask1(), + getTaskTopicPartitionMapForTask0(), getTopologyGroupTaskMap(), getProcessRacksForProcess0(), spyTopicManager, new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() ); - assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks()); + assertFalse(assignor.canEnableRackAwareAssignor()); assertTrue(assignor.populateTopicsToDiscribe(new HashSet<>())); } + @Test + public void shouldOptimizeEmptyActiveTasks() { + final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( + getClusterForTopic0And1(), + getTaskTopicPartitionMapForAllTasks(), + getTopologyGroupTaskMap(), + getProcessRacksForAllProcess(), + mockInternalTopicManager, + new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + ); + + final ClientState clientState0 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); + + clientState0.assignActiveTasks(mkSet(TASK_0_1, TASK_1_1)); + + final SortedMap<UUID, ClientState> clientStateMap = new TreeMap<>(mkMap( + mkEntry(UUID_1, clientState0) + )); + final SortedSet<TaskId> taskIds = mkSortedSet(); Review Comment: This is to test nothing is changed if empty set is passed in ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java: ########## @@ -145,147 +188,517 @@ public void disableActiveSinceRackMissingInClient() { // False since process1 doesn't have rackId assertFalse(assignor.validateClientRack()); - assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks()); + assertFalse(assignor.canEnableRackAwareAssignor()); } @Test - public void disableActiveSinceRackDiffersInSameProcess() { + public void shouldDisableActiveWhenRackDiffersInSameProcess() { final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>(); // Different consumers in same process have different rack ID. This shouldn't happen. // If happens, there's a bug somewhere - processRacks.computeIfAbsent(process0UUID, k -> new HashMap<>()).put("consumer1", Optional.of("rack1")); - processRacks.computeIfAbsent(process0UUID, k -> new HashMap<>()).put("consumer2", Optional.of("rack2")); + processRacks.computeIfAbsent(UUID_1, k -> new HashMap<>()).put("consumer1", Optional.of("rack1")); + processRacks.computeIfAbsent(UUID_1, k -> new HashMap<>()).put("consumer2", Optional.of("rack2")); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( getClusterForTopic0(), - getTaskTopicPartitionMapForTask1(), + getTaskTopicPartitionMapForTask0(), getTopologyGroupTaskMap(), processRacks, mockInternalTopicManager, new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() ); assertFalse(assignor.validateClientRack()); - assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks()); + assertFalse(assignor.canEnableRackAwareAssignor()); } @Test - public void enableRackAwareAssignorForActiveWithoutDescribingTopics() { + public void shouldEnableRackAwareAssignorForActiveWithoutDescribingTopics() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( getClusterForTopic0(), - getTaskTopicPartitionMapForTask1(), + getTaskTopicPartitionMapForTask0(), getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() ); // partitionWithoutInfo00 has rackInfo in cluster metadata - assertTrue(assignor.canEnableRackAwareAssignorForActiveTasks()); + assertTrue(assignor.canEnableRackAwareAssignor()); } @Test - public void enableRackAwareAssignorForActiveWithDescribingTopics() { + public void shouldEnableRackAwareAssignorForActiveWithDescribingTopics() { final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager); doReturn( Collections.singletonMap( - TOPIC0, + TP_0_NAME, Collections.singletonList( - new TopicPartitionInfo(0, node0, Arrays.asList(replicas1), Collections.emptyList()) + new TopicPartitionInfo(0, NODE_0, Arrays.asList(REPLICA_1), Collections.emptyList()) ) ) - ).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0)); + ).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TP_0_NAME)); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( getClusterWithNoNode(), - getTaskTopicPartitionMapForTask1(), + getTaskTopicPartitionMapForTask0(), getTopologyGroupTaskMap(), getProcessRacksForProcess0(), spyTopicManager, new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() ); - assertTrue(assignor.canEnableRackAwareAssignorForActiveTasks()); + assertTrue(assignor.canEnableRackAwareAssignor()); } @Test - public void disableRackAwareAssignorForActiveWithDescribingTopicsFailure() { + public void shouldDisableRackAwareAssignorForActiveWithDescribingTopicsFailure() { final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager); - doThrow(new TimeoutException("Timeout describing topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0)); + doThrow(new TimeoutException("Timeout describing topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton( + TP_0_NAME)); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( getClusterWithNoNode(), - getTaskTopicPartitionMapForTask1(), + getTaskTopicPartitionMapForTask0(), getTopologyGroupTaskMap(), getProcessRacksForProcess0(), spyTopicManager, new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() ); - assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks()); + assertFalse(assignor.canEnableRackAwareAssignor()); assertTrue(assignor.populateTopicsToDiscribe(new HashSet<>())); } + @Test + public void shouldOptimizeEmptyActiveTasks() { + final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( + getClusterForTopic0And1(), + getTaskTopicPartitionMapForAllTasks(), + getTopologyGroupTaskMap(), + getProcessRacksForAllProcess(), + mockInternalTopicManager, + new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + ); + + final ClientState clientState0 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); + + clientState0.assignActiveTasks(mkSet(TASK_0_1, TASK_1_1)); + + final SortedMap<UUID, ClientState> clientStateMap = new TreeMap<>(mkMap( + mkEntry(UUID_1, clientState0) + )); + final SortedSet<TaskId> taskIds = mkSortedSet(); + + if (assignor.canEnableRackAwareAssignor()) { + final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, stateful); + assertEquals(0, originalCost); + + final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, stateful); + assertEquals(0, cost); + } + + assertEquals(mkSet(TASK_0_1, TASK_1_1), clientState0.activeTasks()); + } + + @Test + public void shouldOptimizeActiveTasks() { + final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( + getClusterForTopic0And1(), + getTaskTopicPartitionMapForAllTasks(), + getTopologyGroupTaskMap(), + getProcessRacksForAllProcess(), + mockInternalTopicManager, + new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + ); + + final ClientState clientState0 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); + final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); + final ClientState clientState2 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); + + clientState0.assignActiveTasks(mkSet(TASK_0_1, TASK_1_1)); + clientState1.assignActive(TASK_1_0); + clientState2.assignActive(TASK_0_0); + + // task_0_0 has same rack as UUID_1 + // task_0_1 has same rack as UUID_2 and UUID_3 + // task_1_0 has same rack as UUID_1 and UUID_3 + // task_1_1 has same rack as UUID_2 + // Optimal assignment is UUID_1: {0_0, 1_0}, UUID_2: {1_1}, UUID_3: {0_1} which result in no cross rack traffic + final SortedMap<UUID, ClientState> clientStateMap = new TreeMap<>(mkMap( + mkEntry(UUID_1, clientState0), + mkEntry(UUID_2, clientState1), + mkEntry(UUID_3, clientState2) + )); + final SortedSet<TaskId> taskIds = mkSortedSet(TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1); + + if (assignor.canEnableRackAwareAssignor()) { + int expected = stateful ? 40 : 4; + final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, stateful); + assertEquals(expected, originalCost); + + expected = stateful ? 4 : 0; + final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, stateful); + assertEquals(expected, cost); + } + + assertEquals(mkSet(TASK_0_0, TASK_1_0), clientState0.activeTasks()); + assertEquals(mkSet(TASK_1_1), clientState1.activeTasks()); + assertEquals(mkSet(TASK_0_1), clientState2.activeTasks()); + } + + @Test + public void shouldOptimizeActiveTasksWithWeightOverride() { + final AssignmentConfigs assignmentConfigs = new AssignmentConfigs(1L, 2, 2, 60000L, Collections.emptyList(), 1, 10); + final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( + getClusterForTopic0And1(), + getTaskTopicPartitionMapForAllTasks(), + getTopologyGroupTaskMap(), + getProcessRacksForAllProcess(), + mockInternalTopicManager, + assignmentConfigs + ); + + final ClientState clientState0 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); + final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); + final ClientState clientState2 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); + + clientState0.assignActiveTasks(mkSet(TASK_0_1, TASK_1_1)); + clientState1.assignActive(TASK_1_0); + clientState2.assignActive(TASK_0_0); + + final SortedMap<UUID, ClientState> clientStateMap = new TreeMap<>(mkMap( + mkEntry(UUID_1, clientState0), + mkEntry(UUID_2, clientState1), + mkEntry(UUID_3, clientState2) + )); + final SortedSet<TaskId> taskIds = mkSortedSet(TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1); + + // Because non_overlap_cost is very high, this basically will stick to original assignment + if (assignor.canEnableRackAwareAssignor()) { + final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, stateful); + assertEquals(4, originalCost); Review Comment: In this test, `trafficCost` is 1 and `nonOverlapCost` is 10. So this cost 4 is the trafficCost. nonOverlapCost in original assignment should be 0 since tasks haven't been moved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org