lihaosky commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1271081266


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
         }
         return true;
     }
+
+    private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+        final Map<String, Optional<String>> clientRacks = 
racksForProcess.get(clientId);
+        if (clientRacks == null) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+        }
+        final Optional<Optional<String>> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+        if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+        }
+
+        final String clientRack = clientRackOpt.get().get();
+        final Set<TopicPartition> topicPartitions = 
partitionsForTask.get(taskId);
+        if (topicPartitions == null) {
+            throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+        }
+
+        final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+            : assignmentConfigs.trafficCost;
+        final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+            : assignmentConfigs.nonOverlapCost;
+
+        int cost = 0;
+        for (final TopicPartition tp : topicPartitions) {
+            final Set<String> tpRacks = racksForPartition.get(tp);
+            if (tpRacks == null || tpRacks.isEmpty()) {
+                throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+            }
+            if (!tpRacks.contains(clientRack)) {
+                cost += trafficCost;
+            }
+        }
+
+        if (!inCurrentAssignment) {
+            cost += nonOverlapCost;
+        }
+
+        return cost;
+    }
+
+    // For testing. canEnableRackAwareAssignor must be called first
+    long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, 
final SortedSet<TaskId> statefulTasks, final boolean isStateful) {

Review Comment:
   Forgot to change it...



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
         }
         return true;
     }
+
+    private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+        final Map<String, Optional<String>> clientRacks = 
racksForProcess.get(clientId);
+        if (clientRacks == null) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+        }
+        final Optional<Optional<String>> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+        if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+        }
+
+        final String clientRack = clientRackOpt.get().get();
+        final Set<TopicPartition> topicPartitions = 
partitionsForTask.get(taskId);
+        if (topicPartitions == null) {
+            throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+        }
+
+        final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+            : assignmentConfigs.trafficCost;
+        final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+            : assignmentConfigs.nonOverlapCost;
+
+        int cost = 0;
+        for (final TopicPartition tp : topicPartitions) {
+            final Set<String> tpRacks = racksForPartition.get(tp);
+            if (tpRacks == null || tpRacks.isEmpty()) {
+                throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+            }
+            if (!tpRacks.contains(clientRack)) {
+                cost += trafficCost;
+            }
+        }
+
+        if (!inCurrentAssignment) {
+            cost += nonOverlapCost;
+        }
+
+        return cost;
+    }
+
+    // For testing. canEnableRackAwareAssignor must be called first
+    long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, 
final SortedSet<TaskId> statefulTasks, final boolean isStateful) {
+        final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
+        final List<TaskId> taskIdList = new ArrayList<>(statefulTasks);
+        final Map<TaskId, UUID> taskClientMap = new HashMap<>();
+        final Map<UUID, Integer> clientCapacity = new HashMap<>();
+        final Graph<Integer> graph = new Graph<>();
+
+        constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, 
taskIdList,
+            clientStates, taskClientMap, clientCapacity, isStateful);
+
+        final int sourceId = taskIdList.size() + clientList.size();
+        final int sinkId = sourceId + 1;
+        for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) 
{
+            graph.addEdge(sourceId, taskNodeId, 1, 0, 1);
+        }
+        for (int i = 0; i < clientList.size(); i++) {
+            final int capacity = 
clientCapacity.getOrDefault(clientList.get(i), 0);
+            final int clientNodeId = taskIdList.size() + i;
+            graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity);
+        }
+        graph.setSourceNode(sourceId);
+        graph.setSinkNode(sinkId);
+        return graph.totalCost();
+    }
+
+    /**
+     * Optimize active stateful task assignment for rack awareness. 
canEnableRackAwareAssignor must be called first
+     * @param clientStates Client states
+     * @param taskIds Tasks to reassign if needed. They must be assigned 
already in clientStates
+     * @param isStateful Whether the tasks are stateful
+     * @return Total cost after optimization
+     */
+    public long optimizeActiveTasks(final SortedMap<UUID, ClientState> 
clientStates,
+                                    final SortedSet<TaskId> taskIds,
+                                    final boolean isStateful) {
+        if (taskIds.isEmpty()) {
+            return 0;
+        }
+
+        final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
+        final List<TaskId> taskIdList = new ArrayList<>(taskIds);
+        final Map<TaskId, UUID> taskClientMap = new HashMap<>();
+        final Map<UUID, Integer> clientCapacity = new HashMap<>();
+        final Graph<Integer> graph = new Graph<>();
+
+        constructStatefulActiveTaskGraph(graph, taskIds, clientList, 
taskIdList,
+            clientStates, taskClientMap, clientCapacity, isStateful);
+
+        graph.solveMinCostFlow();
+        final long cost = graph.totalCost();
+
+        assignStatefulActiveTaskFromMinCostFlow(graph, taskIds, clientList, 
taskIdList,
+            clientStates, clientCapacity, taskClientMap);
+
+        return cost;
+    }
+
+    private void constructStatefulActiveTaskGraph(final Graph<Integer> graph,
+                                                  final SortedSet<TaskId> 
statefulTasks,
+                                                  final List<UUID> clientList,
+                                                  final List<TaskId> 
taskIdList,
+                                                  final Map<UUID, ClientState> 
clientStates,
+                                                  final Map<TaskId, UUID> 
taskClientMap,
+                                                  final Map<UUID, Integer> 
clientCapacity,

Review Comment:
   Not the same as `ClientState#capacity`. This is to track how many tasks are 
assigned to each client originally and we need maintain this number after 
optimizing cost. The reason is we need to keep assignment balanced. So we 
assume the assignment passed in from caller is balanced. After optimization, 
the same number of tasks assigned to each client doesn't change.
   
   So when we construct the graph, from source to each task, there's 1 edge 
with capacity 1, flow 1 and cost 0, from each task, there's 1 edge to each 
client with capacity 1, cost computed and flow 0 or 1 (1 means current 
assignment). From each client to sink, there's one edge with capacity equal to 
number of original tasks assigned to the client, cost 0 and flow equal to 
capacity.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -38,29 +43,34 @@
 
 public class RackAwareTaskAssignor {
     private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+    private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;

Review Comment:
   For stateless tasks, there's no `non_overlap_cost`, so traffic cost doesn't 
matter as long as it's positive. The reason is that it's less expensive to 
relocate stateless tasks.
   
   For stateful tasks, there's `non_overlap_cost` which means there's cost if 
you are moving the task to other clients. Since we prefer not to move stateful 
tasks as much as possible, there's non-zero value for this cost. The value for 
`traffic_cost` vs `non_overlap_cost` means how much we favor one compared to 
another. Setting to 10 basically means we value `traffic_cost` much more than 
`non_overlap_cost`. Other default could also be possible. Also, since this 
class is also going to be used by `StickyAssignor`, my plan is to assign 
`non_overlap_cost` higher value than `traffic_cost` for sticky assignor to 
favor stickiness much more



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
         }
         return true;
     }
+
+    private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {

Review Comment:
   Here I meant `processId` using the UUID.
   
   Stateless tasks have 0 `non_overlap_cost` while stateful tasks have postive 
`non_overlap_cost`
   
   `inCurrentAssignment` means the task is assigned to current client referred 
to by `clientId`. If not in current assignment, there's possible 
`non_overlap_cost`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
         }
         return true;
     }
+
+    private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+        final Map<String, Optional<String>> clientRacks = 
racksForProcess.get(clientId);
+        if (clientRacks == null) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+        }
+        final Optional<Optional<String>> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+        if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+        }
+
+        final String clientRack = clientRackOpt.get().get();
+        final Set<TopicPartition> topicPartitions = 
partitionsForTask.get(taskId);
+        if (topicPartitions == null) {
+            throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+        }
+
+        final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+            : assignmentConfigs.trafficCost;
+        final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+            : assignmentConfigs.nonOverlapCost;
+
+        int cost = 0;
+        for (final TopicPartition tp : topicPartitions) {
+            final Set<String> tpRacks = racksForPartition.get(tp);
+            if (tpRacks == null || tpRacks.isEmpty()) {
+                throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+            }
+            if (!tpRacks.contains(clientRack)) {
+                cost += trafficCost;
+            }
+        }
+
+        if (!inCurrentAssignment) {
+            cost += nonOverlapCost;

Review Comment:
   See above explanation of `non_overlap_cost`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java:
##########
@@ -268,24 +268,44 @@ public static class AssignmentConfigs {
         public final long probingRebalanceIntervalMs;
         public final List<String> rackAwareAssignmentTags;
 
+        // TODO: get from streamsConfig after we add the config

Review Comment:
   It's discussed here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-5.PublicInterfaces



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
         }
         return true;
     }
+
+    private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+        final Map<String, Optional<String>> clientRacks = 
racksForProcess.get(clientId);
+        if (clientRacks == null) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+        }
+        final Optional<Optional<String>> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+        if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+        }
+
+        final String clientRack = clientRackOpt.get().get();
+        final Set<TopicPartition> topicPartitions = 
partitionsForTask.get(taskId);
+        if (topicPartitions == null) {
+            throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+        }
+
+        final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+            : assignmentConfigs.trafficCost;
+        final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+            : assignmentConfigs.nonOverlapCost;
+
+        int cost = 0;
+        for (final TopicPartition tp : topicPartitions) {
+            final Set<String> tpRacks = racksForPartition.get(tp);
+            if (tpRacks == null || tpRacks.isEmpty()) {
+                throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+            }
+            if (!tpRacks.contains(clientRack)) {
+                cost += trafficCost;
+            }
+        }
+
+        if (!inCurrentAssignment) {
+            cost += nonOverlapCost;
+        }
+
+        return cost;
+    }
+
+    // For testing. canEnableRackAwareAssignor must be called first
+    long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, 
final SortedSet<TaskId> statefulTasks, final boolean isStateful) {
+        final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
+        final List<TaskId> taskIdList = new ArrayList<>(statefulTasks);
+        final Map<TaskId, UUID> taskClientMap = new HashMap<>();
+        final Map<UUID, Integer> clientCapacity = new HashMap<>();
+        final Graph<Integer> graph = new Graph<>();
+
+        constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, 
taskIdList,
+            clientStates, taskClientMap, clientCapacity, isStateful);
+
+        final int sourceId = taskIdList.size() + clientList.size();
+        final int sinkId = sourceId + 1;
+        for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) 
{
+            graph.addEdge(sourceId, taskNodeId, 1, 0, 1);
+        }
+        for (int i = 0; i < clientList.size(); i++) {
+            final int capacity = 
clientCapacity.getOrDefault(clientList.get(i), 0);
+            final int clientNodeId = taskIdList.size() + i;
+            graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity);
+        }
+        graph.setSourceNode(sourceId);
+        graph.setSinkNode(sinkId);
+        return graph.totalCost();
+    }
+
+    /**
+     * Optimize active stateful task assignment for rack awareness. 
canEnableRackAwareAssignor must be called first
+     * @param clientStates Client states
+     * @param taskIds Tasks to reassign if needed. They must be assigned 
already in clientStates
+     * @param isStateful Whether the tasks are stateful
+     * @return Total cost after optimization
+     */
+    public long optimizeActiveTasks(final SortedMap<UUID, ClientState> 
clientStates,
+                                    final SortedSet<TaskId> taskIds,
+                                    final boolean isStateful) {
+        if (taskIds.isEmpty()) {
+            return 0;
+        }
+
+        final List<UUID> clientList = new ArrayList<>(clientStates.keySet());

Review Comment:
   This is just for getting index easily later. Since node id in graph is 
integer (index), it's easier get reference back to UUID and ClientState using 
index. Otherwise, we need to maintain an index to UUID map I think



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
         }
         return true;
     }
+
+    private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+        final Map<String, Optional<String>> clientRacks = 
racksForProcess.get(clientId);
+        if (clientRacks == null) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+        }
+        final Optional<Optional<String>> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+        if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+        }
+
+        final String clientRack = clientRackOpt.get().get();
+        final Set<TopicPartition> topicPartitions = 
partitionsForTask.get(taskId);
+        if (topicPartitions == null) {
+            throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+        }
+
+        final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+            : assignmentConfigs.trafficCost;
+        final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+            : assignmentConfigs.nonOverlapCost;
+
+        int cost = 0;
+        for (final TopicPartition tp : topicPartitions) {
+            final Set<String> tpRacks = racksForPartition.get(tp);
+            if (tpRacks == null || tpRacks.isEmpty()) {
+                throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+            }
+            if (!tpRacks.contains(clientRack)) {
+                cost += trafficCost;
+            }
+        }
+
+        if (!inCurrentAssignment) {
+            cost += nonOverlapCost;
+        }
+
+        return cost;
+    }
+
+    // For testing. canEnableRackAwareAssignor must be called first
+    long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, 
final SortedSet<TaskId> statefulTasks, final boolean isStateful) {
+        final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
+        final List<TaskId> taskIdList = new ArrayList<>(statefulTasks);
+        final Map<TaskId, UUID> taskClientMap = new HashMap<>();
+        final Map<UUID, Integer> clientCapacity = new HashMap<>();
+        final Graph<Integer> graph = new Graph<>();
+
+        constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, 
taskIdList,
+            clientStates, taskClientMap, clientCapacity, isStateful);
+
+        final int sourceId = taskIdList.size() + clientList.size();
+        final int sinkId = sourceId + 1;

Review Comment:
   Good idea!



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
         }
         return true;
     }
+
+    private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+        final Map<String, Optional<String>> clientRacks = 
racksForProcess.get(clientId);
+        if (clientRacks == null) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+        }
+        final Optional<Optional<String>> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+        if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+        }
+
+        final String clientRack = clientRackOpt.get().get();
+        final Set<TopicPartition> topicPartitions = 
partitionsForTask.get(taskId);
+        if (topicPartitions == null) {
+            throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+        }
+
+        final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+            : assignmentConfigs.trafficCost;
+        final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+            : assignmentConfigs.nonOverlapCost;
+
+        int cost = 0;
+        for (final TopicPartition tp : topicPartitions) {
+            final Set<String> tpRacks = racksForPartition.get(tp);
+            if (tpRacks == null || tpRacks.isEmpty()) {
+                throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+            }
+            if (!tpRacks.contains(clientRack)) {
+                cost += trafficCost;
+            }
+        }
+
+        if (!inCurrentAssignment) {
+            cost += nonOverlapCost;
+        }
+
+        return cost;
+    }
+
+    // For testing. canEnableRackAwareAssignor must be called first
+    long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, 
final SortedSet<TaskId> statefulTasks, final boolean isStateful) {
+        final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
+        final List<TaskId> taskIdList = new ArrayList<>(statefulTasks);
+        final Map<TaskId, UUID> taskClientMap = new HashMap<>();
+        final Map<UUID, Integer> clientCapacity = new HashMap<>();
+        final Graph<Integer> graph = new Graph<>();
+
+        constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, 
taskIdList,
+            clientStates, taskClientMap, clientCapacity, isStateful);
+
+        final int sourceId = taskIdList.size() + clientList.size();
+        final int sinkId = sourceId + 1;
+        for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) 
{
+            graph.addEdge(sourceId, taskNodeId, 1, 0, 1);
+        }
+        for (int i = 0; i < clientList.size(); i++) {
+            final int capacity = 
clientCapacity.getOrDefault(clientList.get(i), 0);
+            final int clientNodeId = taskIdList.size() + i;
+            graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity);
+        }
+        graph.setSourceNode(sourceId);
+        graph.setSinkNode(sinkId);
+        return graph.totalCost();
+    }
+
+    /**
+     * Optimize active stateful task assignment for rack awareness. 
canEnableRackAwareAssignor must be called first
+     * @param clientStates Client states
+     * @param taskIds Tasks to reassign if needed. They must be assigned 
already in clientStates
+     * @param isStateful Whether the tasks are stateful
+     * @return Total cost after optimization
+     */
+    public long optimizeActiveTasks(final SortedMap<UUID, ClientState> 
clientStates,
+                                    final SortedSet<TaskId> taskIds,
+                                    final boolean isStateful) {
+        if (taskIds.isEmpty()) {
+            return 0;
+        }
+
+        final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
+        final List<TaskId> taskIdList = new ArrayList<>(taskIds);

Review Comment:
   Same reason as above to get reference to taskId from nodeId easily



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -38,29 +43,34 @@
 
 public class RackAwareTaskAssignor {
     private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+    private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
+    private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;

Review Comment:
   See above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to