lihaosky commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1271081266
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + + private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { + final Map<String, Optional<String>> clientRacks = racksForProcess.get(clientId); + if (clientRacks == null) { + throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); + } + final Optional<Optional<String>> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); + if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { + throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); + } + + final String clientRack = clientRackOpt.get().get(); + final Set<TopicPartition> topicPartitions = partitionsForTask.get(taskId); + if (topicPartitions == null) { + throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); + } + + final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) + : assignmentConfigs.trafficCost; + final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) + : assignmentConfigs.nonOverlapCost; + + int cost = 0; + for (final TopicPartition tp : topicPartitions) { + final Set<String> tpRacks = racksForPartition.get(tp); + if (tpRacks == null || tpRacks.isEmpty()) { + throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); + } + if (!tpRacks.contains(clientRack)) { + cost += trafficCost; + } + } + + if (!inCurrentAssignment) { + cost += nonOverlapCost; + } + + return cost; + } + + // For testing. canEnableRackAwareAssignor must be called first + long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, final SortedSet<TaskId> statefulTasks, final boolean isStateful) { Review Comment: Forgot to change it... ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + + private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { + final Map<String, Optional<String>> clientRacks = racksForProcess.get(clientId); + if (clientRacks == null) { + throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); + } + final Optional<Optional<String>> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); + if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { + throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); + } + + final String clientRack = clientRackOpt.get().get(); + final Set<TopicPartition> topicPartitions = partitionsForTask.get(taskId); + if (topicPartitions == null) { + throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); + } + + final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) + : assignmentConfigs.trafficCost; + final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) + : assignmentConfigs.nonOverlapCost; + + int cost = 0; + for (final TopicPartition tp : topicPartitions) { + final Set<String> tpRacks = racksForPartition.get(tp); + if (tpRacks == null || tpRacks.isEmpty()) { + throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); + } + if (!tpRacks.contains(clientRack)) { + cost += trafficCost; + } + } + + if (!inCurrentAssignment) { + cost += nonOverlapCost; + } + + return cost; + } + + // For testing. canEnableRackAwareAssignor must be called first + long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, final SortedSet<TaskId> statefulTasks, final boolean isStateful) { + final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); + final List<TaskId> taskIdList = new ArrayList<>(statefulTasks); + final Map<TaskId, UUID> taskClientMap = new HashMap<>(); + final Map<UUID, Integer> clientCapacity = new HashMap<>(); + final Graph<Integer> graph = new Graph<>(); + + constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, taskIdList, + clientStates, taskClientMap, clientCapacity, isStateful); + + final int sourceId = taskIdList.size() + clientList.size(); + final int sinkId = sourceId + 1; + for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) { + graph.addEdge(sourceId, taskNodeId, 1, 0, 1); + } + for (int i = 0; i < clientList.size(); i++) { + final int capacity = clientCapacity.getOrDefault(clientList.get(i), 0); + final int clientNodeId = taskIdList.size() + i; + graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity); + } + graph.setSourceNode(sourceId); + graph.setSinkNode(sinkId); + return graph.totalCost(); + } + + /** + * Optimize active stateful task assignment for rack awareness. canEnableRackAwareAssignor must be called first + * @param clientStates Client states + * @param taskIds Tasks to reassign if needed. They must be assigned already in clientStates + * @param isStateful Whether the tasks are stateful + * @return Total cost after optimization + */ + public long optimizeActiveTasks(final SortedMap<UUID, ClientState> clientStates, + final SortedSet<TaskId> taskIds, + final boolean isStateful) { + if (taskIds.isEmpty()) { + return 0; + } + + final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); + final List<TaskId> taskIdList = new ArrayList<>(taskIds); + final Map<TaskId, UUID> taskClientMap = new HashMap<>(); + final Map<UUID, Integer> clientCapacity = new HashMap<>(); + final Graph<Integer> graph = new Graph<>(); + + constructStatefulActiveTaskGraph(graph, taskIds, clientList, taskIdList, + clientStates, taskClientMap, clientCapacity, isStateful); + + graph.solveMinCostFlow(); + final long cost = graph.totalCost(); + + assignStatefulActiveTaskFromMinCostFlow(graph, taskIds, clientList, taskIdList, + clientStates, clientCapacity, taskClientMap); + + return cost; + } + + private void constructStatefulActiveTaskGraph(final Graph<Integer> graph, + final SortedSet<TaskId> statefulTasks, + final List<UUID> clientList, + final List<TaskId> taskIdList, + final Map<UUID, ClientState> clientStates, + final Map<TaskId, UUID> taskClientMap, + final Map<UUID, Integer> clientCapacity, Review Comment: Not the same as `ClientState#capacity`. This is to track how many tasks are assigned to each client originally and we need maintain this number after optimizing cost. The reason is we need to keep assignment balanced. So we assume the assignment passed in from caller is balanced. After optimization, the same number of tasks assigned to each client doesn't change. So when we construct the graph, from source to each task, there's 1 edge with capacity 1, flow 1 and cost 0, from each task, there's 1 edge to each client with capacity 1, cost computed and flow 0 or 1 (1 means current assignment). From each client to sink, there's one edge with capacity equal to number of original tasks assigned to the client, cost 0 and flow equal to capacity. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -38,29 +43,34 @@ public class RackAwareTaskAssignor { private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class); + private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; Review Comment: For stateless tasks, there's no `non_overlap_cost`, so traffic cost doesn't matter as long as it's positive. The reason is that it's less expensive to relocate stateless tasks. For stateful tasks, there's `non_overlap_cost` which means there's cost if you are moving the task to other clients. Since we prefer not to move stateful tasks as much as possible, there's non-zero value for this cost. The value for `traffic_cost` vs `non_overlap_cost` means how much we favor one compared to another. Setting to 10 basically means we value `traffic_cost` much more than `non_overlap_cost`. Other default could also be possible. Also, since this class is also going to be used by `StickyAssignor`, my plan is to assign `non_overlap_cost` higher value than `traffic_cost` for sticky assignor to favor stickiness much more ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + + private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { Review Comment: Here I meant `processId` using the UUID. Stateless tasks have 0 `non_overlap_cost` while stateful tasks have postive `non_overlap_cost` `inCurrentAssignment` means the task is assigned to current client referred to by `clientId`. If not in current assignment, there's possible `non_overlap_cost` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + + private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { + final Map<String, Optional<String>> clientRacks = racksForProcess.get(clientId); + if (clientRacks == null) { + throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); + } + final Optional<Optional<String>> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); + if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { + throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); + } + + final String clientRack = clientRackOpt.get().get(); + final Set<TopicPartition> topicPartitions = partitionsForTask.get(taskId); + if (topicPartitions == null) { + throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); + } + + final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) + : assignmentConfigs.trafficCost; + final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) + : assignmentConfigs.nonOverlapCost; + + int cost = 0; + for (final TopicPartition tp : topicPartitions) { + final Set<String> tpRacks = racksForPartition.get(tp); + if (tpRacks == null || tpRacks.isEmpty()) { + throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); + } + if (!tpRacks.contains(clientRack)) { + cost += trafficCost; + } + } + + if (!inCurrentAssignment) { + cost += nonOverlapCost; Review Comment: See above explanation of `non_overlap_cost` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java: ########## @@ -268,24 +268,44 @@ public static class AssignmentConfigs { public final long probingRebalanceIntervalMs; public final List<String> rackAwareAssignmentTags; + // TODO: get from streamsConfig after we add the config Review Comment: It's discussed here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-5.PublicInterfaces ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + + private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { + final Map<String, Optional<String>> clientRacks = racksForProcess.get(clientId); + if (clientRacks == null) { + throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); + } + final Optional<Optional<String>> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); + if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { + throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); + } + + final String clientRack = clientRackOpt.get().get(); + final Set<TopicPartition> topicPartitions = partitionsForTask.get(taskId); + if (topicPartitions == null) { + throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); + } + + final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) + : assignmentConfigs.trafficCost; + final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) + : assignmentConfigs.nonOverlapCost; + + int cost = 0; + for (final TopicPartition tp : topicPartitions) { + final Set<String> tpRacks = racksForPartition.get(tp); + if (tpRacks == null || tpRacks.isEmpty()) { + throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); + } + if (!tpRacks.contains(clientRack)) { + cost += trafficCost; + } + } + + if (!inCurrentAssignment) { + cost += nonOverlapCost; + } + + return cost; + } + + // For testing. canEnableRackAwareAssignor must be called first + long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, final SortedSet<TaskId> statefulTasks, final boolean isStateful) { + final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); + final List<TaskId> taskIdList = new ArrayList<>(statefulTasks); + final Map<TaskId, UUID> taskClientMap = new HashMap<>(); + final Map<UUID, Integer> clientCapacity = new HashMap<>(); + final Graph<Integer> graph = new Graph<>(); + + constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, taskIdList, + clientStates, taskClientMap, clientCapacity, isStateful); + + final int sourceId = taskIdList.size() + clientList.size(); + final int sinkId = sourceId + 1; + for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) { + graph.addEdge(sourceId, taskNodeId, 1, 0, 1); + } + for (int i = 0; i < clientList.size(); i++) { + final int capacity = clientCapacity.getOrDefault(clientList.get(i), 0); + final int clientNodeId = taskIdList.size() + i; + graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity); + } + graph.setSourceNode(sourceId); + graph.setSinkNode(sinkId); + return graph.totalCost(); + } + + /** + * Optimize active stateful task assignment for rack awareness. canEnableRackAwareAssignor must be called first + * @param clientStates Client states + * @param taskIds Tasks to reassign if needed. They must be assigned already in clientStates + * @param isStateful Whether the tasks are stateful + * @return Total cost after optimization + */ + public long optimizeActiveTasks(final SortedMap<UUID, ClientState> clientStates, + final SortedSet<TaskId> taskIds, + final boolean isStateful) { + if (taskIds.isEmpty()) { + return 0; + } + + final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); Review Comment: This is just for getting index easily later. Since node id in graph is integer (index), it's easier get reference back to UUID and ClientState using index. Otherwise, we need to maintain an index to UUID map I think ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + + private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { + final Map<String, Optional<String>> clientRacks = racksForProcess.get(clientId); + if (clientRacks == null) { + throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); + } + final Optional<Optional<String>> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); + if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { + throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); + } + + final String clientRack = clientRackOpt.get().get(); + final Set<TopicPartition> topicPartitions = partitionsForTask.get(taskId); + if (topicPartitions == null) { + throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); + } + + final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) + : assignmentConfigs.trafficCost; + final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) + : assignmentConfigs.nonOverlapCost; + + int cost = 0; + for (final TopicPartition tp : topicPartitions) { + final Set<String> tpRacks = racksForPartition.get(tp); + if (tpRacks == null || tpRacks.isEmpty()) { + throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); + } + if (!tpRacks.contains(clientRack)) { + cost += trafficCost; + } + } + + if (!inCurrentAssignment) { + cost += nonOverlapCost; + } + + return cost; + } + + // For testing. canEnableRackAwareAssignor must be called first + long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, final SortedSet<TaskId> statefulTasks, final boolean isStateful) { + final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); + final List<TaskId> taskIdList = new ArrayList<>(statefulTasks); + final Map<TaskId, UUID> taskClientMap = new HashMap<>(); + final Map<UUID, Integer> clientCapacity = new HashMap<>(); + final Graph<Integer> graph = new Graph<>(); + + constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, taskIdList, + clientStates, taskClientMap, clientCapacity, isStateful); + + final int sourceId = taskIdList.size() + clientList.size(); + final int sinkId = sourceId + 1; Review Comment: Good idea! ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + + private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { + final Map<String, Optional<String>> clientRacks = racksForProcess.get(clientId); + if (clientRacks == null) { + throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); + } + final Optional<Optional<String>> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); + if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { + throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); + } + + final String clientRack = clientRackOpt.get().get(); + final Set<TopicPartition> topicPartitions = partitionsForTask.get(taskId); + if (topicPartitions == null) { + throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); + } + + final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) + : assignmentConfigs.trafficCost; + final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) + : assignmentConfigs.nonOverlapCost; + + int cost = 0; + for (final TopicPartition tp : topicPartitions) { + final Set<String> tpRacks = racksForPartition.get(tp); + if (tpRacks == null || tpRacks.isEmpty()) { + throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); + } + if (!tpRacks.contains(clientRack)) { + cost += trafficCost; + } + } + + if (!inCurrentAssignment) { + cost += nonOverlapCost; + } + + return cost; + } + + // For testing. canEnableRackAwareAssignor must be called first + long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, final SortedSet<TaskId> statefulTasks, final boolean isStateful) { + final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); + final List<TaskId> taskIdList = new ArrayList<>(statefulTasks); + final Map<TaskId, UUID> taskClientMap = new HashMap<>(); + final Map<UUID, Integer> clientCapacity = new HashMap<>(); + final Graph<Integer> graph = new Graph<>(); + + constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, taskIdList, + clientStates, taskClientMap, clientCapacity, isStateful); + + final int sourceId = taskIdList.size() + clientList.size(); + final int sinkId = sourceId + 1; + for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) { + graph.addEdge(sourceId, taskNodeId, 1, 0, 1); + } + for (int i = 0; i < clientList.size(); i++) { + final int capacity = clientCapacity.getOrDefault(clientList.get(i), 0); + final int clientNodeId = taskIdList.size() + i; + graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity); + } + graph.setSourceNode(sourceId); + graph.setSinkNode(sinkId); + return graph.totalCost(); + } + + /** + * Optimize active stateful task assignment for rack awareness. canEnableRackAwareAssignor must be called first + * @param clientStates Client states + * @param taskIds Tasks to reassign if needed. They must be assigned already in clientStates + * @param isStateful Whether the tasks are stateful + * @return Total cost after optimization + */ + public long optimizeActiveTasks(final SortedMap<UUID, ClientState> clientStates, + final SortedSet<TaskId> taskIds, + final boolean isStateful) { + if (taskIds.isEmpty()) { + return 0; + } + + final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); + final List<TaskId> taskIdList = new ArrayList<>(taskIds); Review Comment: Same reason as above to get reference to taskId from nodeId easily ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -38,29 +43,34 @@ public class RackAwareTaskAssignor { private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class); + private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; + private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1; Review Comment: See above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org