[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks
mjsax commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1275434335 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java: ## @@ -69,12 +71,38 @@ public final class AssignmentTestUtils { public static final UUID UUID_8 = uuidForInt(8); public static final UUID UUID_9 = uuidForInt(9); -public static final TopicPartition TP_0_0 = new TopicPartition("topic0", 0); -public static final TopicPartition TP_0_1 = new TopicPartition("topic0", 1); -public static final TopicPartition TP_0_2 = new TopicPartition("topic0", 2); -public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 0); -public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 1); -public static final TopicPartition TP_1_2 = new TopicPartition("topic1", 2); +public static final String RACK_0 = "rock0"; Review Comment: ```suggestion public static final String RACK_0 = "rack0"; ``` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -185,4 +188,213 @@ public boolean validateClientRack() { } return true; } + +private int getCost(final TaskId taskId, final UUID processId, final boolean inCurrentAssignment, final int trafficCost, final int nonOverlapCost) { +final Map> clientRacks = racksForProcess.get(processId); +if (clientRacks == null) { +throw new IllegalStateException("Client " + processId + " doesn't exist in processRacks"); +} +final Optional> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); +if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { +throw new IllegalStateException("Client " + processId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); +} + +final String clientRack = clientRackOpt.get().get(); +final Set topicPartitions = partitionsForTask.get(taskId); +if (topicPartitions == null || topicPartitions.isEmpty()) { +throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); +} + +int cost = 0; +for (final TopicPartition tp : topicPartitions) { +final Set tpRacks = racksForPartition.get(tp); +if (tpRacks == null || tpRacks.isEmpty()) { +throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); +} +if (!tpRacks.contains(clientRack)) { +cost += trafficCost; +} +} + +if (!inCurrentAssignment) { +cost += nonOverlapCost; +} + +return cost; +} + +private static int getSinkID(final List clientList, final List taskIdList) { +return clientList.size() + taskIdList.size(); +} + +// For testing. canEnableRackAwareAssignor must be called first +long activeTasksCost(final SortedMap clientStates, final SortedSet activeTasks, final int trafficCost, final int nonOverlapCost) { Review Comment: Can we add JavaDocs? It's a little unclear what this method does. Also maybe move `activeTasks` as first parameter, as they are the main input (all others are metadata)? ``` /* Compute the cost for the provided {@code activeTasks}. The passed in active tasks must be contained in {@code clientState}`. */ ``` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -74,11 +81,7 @@ public synchronized boolean canEnableRackAwareAssignorForActiveTasks() { } return validateTopicPartitionRack(); -} - -public boolean canEnableRackAwareAssignorForStandbyTasks() { -// TODO -return false; +// TODO: add changelog topic, standby task validation } // Visible for testing. This method also checks if all TopicPartitions exist in cluster Review Comment: Typo one line below: `D[e]scribe` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java: ## @@ -69,12 +71,38 @@ public final class AssignmentTestUtils { public static final UUID UUID_8 = uuidForInt(8); public static final UUID UUID_9 = uuidForInt(9); -public static final TopicPartition TP_0_0 = new TopicPartition("topic0", 0); -public static final TopicPartition TP_0_1 = new TopicPartition("topic0", 1); -public static final TopicPartition TP_0_2 = new TopicPartition("topic0", 2); -public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 0); -public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 1); -public static final Topic
[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks
mjsax commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1272606193 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -185,4 +193,212 @@ public boolean validateClientRack() { } return true; } + +private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { +final Map> clientRacks = racksForProcess.get(clientId); +if (clientRacks == null) { +throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); +} +final Optional> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); +if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { Review Comment: We `filter()` already for `isPresent` -- seems we only seen the first check? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -185,4 +193,212 @@ public boolean validateClientRack() { } return true; } + +private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { +final Map> clientRacks = racksForProcess.get(clientId); +if (clientRacks == null) { +throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); +} +final Optional> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); +if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { +throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); +} + +final String clientRack = clientRackOpt.get().get(); +final Set topicPartitions = partitionsForTask.get(taskId); +if (topicPartitions == null || topicPartitions.isEmpty()) { +throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); +} + +final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) +: assignmentConfigs.trafficCost; +final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) +: assignmentConfigs.nonOverlapCost; + +int cost = 0; +for (final TopicPartition tp : topicPartitions) { +final Set tpRacks = racksForPartition.get(tp); +if (tpRacks == null || tpRacks.isEmpty()) { +throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); +} +if (!tpRacks.contains(clientRack)) { +cost += trafficCost; +} +} + +if (!inCurrentAssignment) { +cost += nonOverlapCost; +} + +return cost; +} + +private static int getSinkID(final List clientList, final List taskIdList) { +return clientList.size() + taskIdList.size(); +} + +// For testing. canEnableRackAwareAssignor must be called first +long activeTasksCost(final SortedMap clientStates, final SortedSet activeTasks, final boolean isStateful) { +final List clientList = new ArrayList<>(clientStates.keySet()); +final List taskIdList = new ArrayList<>(activeTasks); +final Map taskClientMap = new HashMap<>(); +final Map originalAssignedTaskNumber = new HashMap<>(); +final Graph graph = constructActiveTaskGraph(activeTasks, clientList, taskIdList, +clientStates, taskClientMap, originalAssignedTaskNumber, isStateful); Review Comment: Given that we don't read `taskClientMap` nor `originalAssignedTaskNumber`, should we just pass `new HashMap` directly? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -185,4 +193,212 @@ public boolean validateClientRack() { } return true; } + +private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { +final Map> clientRacks = racksForProcess.get(clientId); +if (clientRacks == null) { +throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); +} +final Optional> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); +if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { +throw new IllegalStateException("Client " + clientId + " does
[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks
mjsax commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1271152663 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + +private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { +final Map> clientRacks = racksForProcess.get(clientId); +if (clientRacks == null) { +throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); +} +final Optional> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); +if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { +throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); +} + +final String clientRack = clientRackOpt.get().get(); +final Set topicPartitions = partitionsForTask.get(taskId); +if (topicPartitions == null) { +throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); +} + +final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) +: assignmentConfigs.trafficCost; +final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) +: assignmentConfigs.nonOverlapCost; + +int cost = 0; +for (final TopicPartition tp : topicPartitions) { +final Set tpRacks = racksForPartition.get(tp); +if (tpRacks == null || tpRacks.isEmpty()) { +throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); +} +if (!tpRacks.contains(clientRack)) { +cost += trafficCost; +} +} + +if (!inCurrentAssignment) { +cost += nonOverlapCost; +} + +return cost; +} + +// For testing. canEnableRackAwareAssignor must be called first +long activeTasksCost(final SortedMap clientStates, final SortedSet statefulTasks, final boolean isStateful) { +final List clientList = new ArrayList<>(clientStates.keySet()); +final List taskIdList = new ArrayList<>(statefulTasks); +final Map taskClientMap = new HashMap<>(); +final Map clientCapacity = new HashMap<>(); +final Graph graph = new Graph<>(); + +constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, taskIdList, +clientStates, taskClientMap, clientCapacity, isStateful); + +final int sourceId = taskIdList.size() + clientList.size(); +final int sinkId = sourceId + 1; +for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) { +graph.addEdge(sourceId, taskNodeId, 1, 0, 1); +} +for (int i = 0; i < clientList.size(); i++) { +final int capacity = clientCapacity.getOrDefault(clientList.get(i), 0); +final int clientNodeId = taskIdList.size() + i; +graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity); +} +graph.setSourceNode(sourceId); +graph.setSinkNode(sinkId); +return graph.totalCost(); +} + +/** + * Optimize active stateful task assignment for rack awareness. canEnableRackAwareAssignor must be called first + * @param clientStates Client states + * @param taskIds Tasks to reassign if needed. They must be assigned already in clientStates + * @param isStateful Whether the tasks are stateful + * @return Total cost after optimization + */ +public long optimizeActiveTasks(final SortedMap clientStates, +final SortedSet taskIds, +final boolean isStateful) { +if (taskIds.isEmpty()) { +return 0; +} + +final List clientList = new ArrayList<>(clientStates.keySet()); Review Comment: Well, if it's useful for `constructStatefulActiveTaskGraph` to have such a list, we should construct this list inside `constructStatefulActiveTaskGraph` but not pass it in? Otherwise, we leak an optimization from `constructStatefulActiveTaskGraph` to the caller what seems not ideal? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.
[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks
mjsax commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1271152663 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + +private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { +final Map> clientRacks = racksForProcess.get(clientId); +if (clientRacks == null) { +throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); +} +final Optional> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); +if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { +throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); +} + +final String clientRack = clientRackOpt.get().get(); +final Set topicPartitions = partitionsForTask.get(taskId); +if (topicPartitions == null) { +throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); +} + +final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) +: assignmentConfigs.trafficCost; +final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) +: assignmentConfigs.nonOverlapCost; + +int cost = 0; +for (final TopicPartition tp : topicPartitions) { +final Set tpRacks = racksForPartition.get(tp); +if (tpRacks == null || tpRacks.isEmpty()) { +throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); +} +if (!tpRacks.contains(clientRack)) { +cost += trafficCost; +} +} + +if (!inCurrentAssignment) { +cost += nonOverlapCost; +} + +return cost; +} + +// For testing. canEnableRackAwareAssignor must be called first +long activeTasksCost(final SortedMap clientStates, final SortedSet statefulTasks, final boolean isStateful) { +final List clientList = new ArrayList<>(clientStates.keySet()); +final List taskIdList = new ArrayList<>(statefulTasks); +final Map taskClientMap = new HashMap<>(); +final Map clientCapacity = new HashMap<>(); +final Graph graph = new Graph<>(); + +constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, taskIdList, +clientStates, taskClientMap, clientCapacity, isStateful); + +final int sourceId = taskIdList.size() + clientList.size(); +final int sinkId = sourceId + 1; +for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) { +graph.addEdge(sourceId, taskNodeId, 1, 0, 1); +} +for (int i = 0; i < clientList.size(); i++) { +final int capacity = clientCapacity.getOrDefault(clientList.get(i), 0); +final int clientNodeId = taskIdList.size() + i; +graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity); +} +graph.setSourceNode(sourceId); +graph.setSinkNode(sinkId); +return graph.totalCost(); +} + +/** + * Optimize active stateful task assignment for rack awareness. canEnableRackAwareAssignor must be called first + * @param clientStates Client states + * @param taskIds Tasks to reassign if needed. They must be assigned already in clientStates + * @param isStateful Whether the tasks are stateful + * @return Total cost after optimization + */ +public long optimizeActiveTasks(final SortedMap clientStates, +final SortedSet taskIds, +final boolean isStateful) { +if (taskIds.isEmpty()) { +return 0; +} + +final List clientList = new ArrayList<>(clientStates.keySet()); Review Comment: Well, if it's useful for `constructStatefulActiveTaskGraph` to have such a list, we should construct this list inside ` constructStatefulActiveTaskGraph` but not pass it in? Otherwise, we leak an optimization from ` constructStatefulActiveTaskGraph` to the caller what seems not ideal? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apach
[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks
mjsax commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1271151967 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + +private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { +final Map> clientRacks = racksForProcess.get(clientId); +if (clientRacks == null) { +throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); +} +final Optional> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); +if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { +throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); +} + +final String clientRack = clientRackOpt.get().get(); +final Set topicPartitions = partitionsForTask.get(taskId); +if (topicPartitions == null) { +throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); +} + +final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) +: assignmentConfigs.trafficCost; +final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) +: assignmentConfigs.nonOverlapCost; + +int cost = 0; +for (final TopicPartition tp : topicPartitions) { +final Set tpRacks = racksForPartition.get(tp); +if (tpRacks == null || tpRacks.isEmpty()) { +throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); +} +if (!tpRacks.contains(clientRack)) { +cost += trafficCost; +} +} + +if (!inCurrentAssignment) { +cost += nonOverlapCost; +} + +return cost; +} + +// For testing. canEnableRackAwareAssignor must be called first +long activeTasksCost(final SortedMap clientStates, final SortedSet statefulTasks, final boolean isStateful) { +final List clientList = new ArrayList<>(clientStates.keySet()); +final List taskIdList = new ArrayList<>(statefulTasks); +final Map taskClientMap = new HashMap<>(); +final Map clientCapacity = new HashMap<>(); +final Graph graph = new Graph<>(); + +constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, taskIdList, +clientStates, taskClientMap, clientCapacity, isStateful); + +final int sourceId = taskIdList.size() + clientList.size(); +final int sinkId = sourceId + 1; +for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) { +graph.addEdge(sourceId, taskNodeId, 1, 0, 1); +} +for (int i = 0; i < clientList.size(); i++) { +final int capacity = clientCapacity.getOrDefault(clientList.get(i), 0); +final int clientNodeId = taskIdList.size() + i; +graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity); +} +graph.setSourceNode(sourceId); +graph.setSinkNode(sinkId); +return graph.totalCost(); +} + +/** + * Optimize active stateful task assignment for rack awareness. canEnableRackAwareAssignor must be called first + * @param clientStates Client states + * @param taskIds Tasks to reassign if needed. They must be assigned already in clientStates + * @param isStateful Whether the tasks are stateful + * @return Total cost after optimization + */ +public long optimizeActiveTasks(final SortedMap clientStates, +final SortedSet taskIds, +final boolean isStateful) { +if (taskIds.isEmpty()) { +return 0; +} + +final List clientList = new ArrayList<>(clientStates.keySet()); +final List taskIdList = new ArrayList<>(taskIds); +final Map taskClientMap = new HashMap<>(); +final Map clientCapacity = new HashMap<>(); +final Graph graph = new Graph<>(); + +constructStatefulActiveTaskGraph(graph, taskIds, clientList, taskIdList, +clientStates, taskClientMap, clientCapacity, isStateful); + +graph.solveMinCostFlow(); +final long cost = graph.totalCost(); + +assignStatefulActiveTaskFromMinCostFlow(graph, taskIds, clientList, taskIdList, +clientStates, clientCapacity, taskClientMap); + +return cost;
[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks
mjsax commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1270106140 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + +private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { +final Map> clientRacks = racksForProcess.get(clientId); +if (clientRacks == null) { +throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); +} +final Optional> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); +if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { +throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); +} + +final String clientRack = clientRackOpt.get().get(); +final Set topicPartitions = partitionsForTask.get(taskId); +if (topicPartitions == null) { +throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); +} + +final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) +: assignmentConfigs.trafficCost; +final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) +: assignmentConfigs.nonOverlapCost; + +int cost = 0; +for (final TopicPartition tp : topicPartitions) { +final Set tpRacks = racksForPartition.get(tp); +if (tpRacks == null || tpRacks.isEmpty()) { +throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); +} +if (!tpRacks.contains(clientRack)) { +cost += trafficCost; +} +} + +if (!inCurrentAssignment) { +cost += nonOverlapCost; +} + +return cost; +} + +// For testing. canEnableRackAwareAssignor must be called first +long activeTasksCost(final SortedMap clientStates, final SortedSet statefulTasks, final boolean isStateful) { +final List clientList = new ArrayList<>(clientStates.keySet()); +final List taskIdList = new ArrayList<>(statefulTasks); +final Map taskClientMap = new HashMap<>(); +final Map clientCapacity = new HashMap<>(); +final Graph graph = new Graph<>(); + +constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, taskIdList, +clientStates, taskClientMap, clientCapacity, isStateful); + +final int sourceId = taskIdList.size() + clientList.size(); +final int sinkId = sourceId + 1; Review Comment: Looking into `constructStatefulActiveTaskGraph` it does the same thing. Sounds error prone. Should we not get source and sink ID from `graph` instead of re-computing it? (Seems we do the same thing on two places) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks
mjsax commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1270094068 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -38,29 +43,34 @@ public class RackAwareTaskAssignor { private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class); +private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; +private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1; Review Comment: Why is this not zero (as it is for stateless)? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -38,29 +43,34 @@ public class RackAwareTaskAssignor { private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class); +private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; Review Comment: Why is this set to `10`? In particular, why is it higher than for the stateless case? In the end, my understanding was that we try to optimize for input partitions, and for this case, there is no difference if a task has state or not, but only the number of input topic partitions for a task matter (each with equal cost) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java: ## @@ -268,24 +268,44 @@ public static class AssignmentConfigs { public final long probingRebalanceIntervalMs; public final List rackAwareAssignmentTags; +// TODO: get from streamsConfig after we add the config Review Comment: I cannot remember such parameters being defined in the KIP. Can you elaborate? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + +private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { +final Map> clientRacks = racksForProcess.get(clientId); +if (clientRacks == null) { +throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); +} +final Optional> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); +if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { +throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); +} + +final String clientRack = clientRackOpt.get().get(); +final Set topicPartitions = partitionsForTask.get(taskId); +if (topicPartitions == null) { +throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); +} + +final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) +: assignmentConfigs.trafficCost; +final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) +: assignmentConfigs.nonOverlapCost; + +int cost = 0; +for (final TopicPartition tp : topicPartitions) { +final Set tpRacks = racksForPartition.get(tp); +if (tpRacks == null || tpRacks.isEmpty()) { +throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); +} +if (!tpRacks.contains(clientRack)) { +cost += trafficCost; +} +} + +if (!inCurrentAssignment) { +cost += nonOverlapCost; +} + +return cost; +} + +// For testing. canEnableRackAwareAssignor must be called first +long activeTasksCost(final SortedMap clientStates, final SortedSet statefulTasks, final boolean isStateful) { +final List clientList = new ArrayList<>(clientStates.keySet()); +final List taskIdList = new ArrayList<>(statefulTasks); +final Map taskClientMap = new HashMap<>(); +final Map clientCapacity = new HashMap<>(); +final Graph graph = new Graph<>(); + +constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, taskIdList, +clientStates, taskClientMap, clientCapacity, isStateful); + +final int sourceId = taskIdList.size() + clientList.size(); +final int sinkId = sourceId + 1; +for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) { +graph.addEdge(sourceId, taskNodeId, 1, 0, 1); +} +for (int i = 0; i < clientList.size(); i++) { +final int capacity = clie