lihaosky commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1271539596


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
         }
         return true;
     }
+
+    private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+        final Map<String, Optional<String>> clientRacks = 
racksForProcess.get(clientId);
+        if (clientRacks == null) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+        }
+        final Optional<Optional<String>> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+        if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+        }
+
+        final String clientRack = clientRackOpt.get().get();
+        final Set<TopicPartition> topicPartitions = 
partitionsForTask.get(taskId);
+        if (topicPartitions == null) {
+            throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+        }
+
+        final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+            : assignmentConfigs.trafficCost;
+        final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+            : assignmentConfigs.nonOverlapCost;
+
+        int cost = 0;
+        for (final TopicPartition tp : topicPartitions) {
+            final Set<String> tpRacks = racksForPartition.get(tp);
+            if (tpRacks == null || tpRacks.isEmpty()) {
+                throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+            }
+            if (!tpRacks.contains(clientRack)) {
+                cost += trafficCost;
+            }
+        }
+
+        if (!inCurrentAssignment) {
+            cost += nonOverlapCost;
+        }
+
+        return cost;
+    }
+
+    // For testing. canEnableRackAwareAssignor must be called first
+    long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, 
final SortedSet<TaskId> statefulTasks, final boolean isStateful) {
+        final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
+        final List<TaskId> taskIdList = new ArrayList<>(statefulTasks);
+        final Map<TaskId, UUID> taskClientMap = new HashMap<>();
+        final Map<UUID, Integer> clientCapacity = new HashMap<>();
+        final Graph<Integer> graph = new Graph<>();
+
+        constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, 
taskIdList,
+            clientStates, taskClientMap, clientCapacity, isStateful);
+
+        final int sourceId = taskIdList.size() + clientList.size();
+        final int sinkId = sourceId + 1;
+        for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) 
{
+            graph.addEdge(sourceId, taskNodeId, 1, 0, 1);
+        }
+        for (int i = 0; i < clientList.size(); i++) {
+            final int capacity = 
clientCapacity.getOrDefault(clientList.get(i), 0);
+            final int clientNodeId = taskIdList.size() + i;
+            graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity);
+        }
+        graph.setSourceNode(sourceId);
+        graph.setSinkNode(sinkId);
+        return graph.totalCost();
+    }
+
+    /**
+     * Optimize active stateful task assignment for rack awareness. 
canEnableRackAwareAssignor must be called first
+     * @param clientStates Client states
+     * @param taskIds Tasks to reassign if needed. They must be assigned 
already in clientStates
+     * @param isStateful Whether the tasks are stateful
+     * @return Total cost after optimization
+     */
+    public long optimizeActiveTasks(final SortedMap<UUID, ClientState> 
clientStates,
+                                    final SortedSet<TaskId> taskIds,
+                                    final boolean isStateful) {
+        if (taskIds.isEmpty()) {
+            return 0;
+        }
+
+        final List<UUID> clientList = new ArrayList<>(clientStates.keySet());

Review Comment:
   It's also used on line 286. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to