[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks

2023-07-26 Thread via GitHub


mjsax commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1275434335


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -69,12 +71,38 @@ public final class AssignmentTestUtils {
 public static final UUID UUID_8 = uuidForInt(8);
 public static final UUID UUID_9 = uuidForInt(9);
 
-public static final TopicPartition TP_0_0 = new TopicPartition("topic0", 
0);
-public static final TopicPartition TP_0_1 = new TopicPartition("topic0", 
1);
-public static final TopicPartition TP_0_2 = new TopicPartition("topic0", 
2);
-public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 
0);
-public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 
1);
-public static final TopicPartition TP_1_2 = new TopicPartition("topic1", 
2);
+public static final String RACK_0 = "rock0";

Review Comment:
   ```suggestion
   public static final String RACK_0 = "rack0";
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -185,4 +188,213 @@ public boolean validateClientRack() {
 }
 return true;
 }
+
+private int getCost(final TaskId taskId, final UUID processId, final 
boolean inCurrentAssignment, final int trafficCost, final int nonOverlapCost) {
+final Map> clientRacks = 
racksForProcess.get(processId);
+if (clientRacks == null) {
+throw new IllegalStateException("Client " + processId + " doesn't 
exist in processRacks");
+}
+final Optional> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+throw new IllegalStateException("Client " + processId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+
+final String clientRack = clientRackOpt.get().get();
+final Set topicPartitions = 
partitionsForTask.get(taskId);
+if (topicPartitions == null || topicPartitions.isEmpty()) {
+throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+}
+
+int cost = 0;
+for (final TopicPartition tp : topicPartitions) {
+final Set tpRacks = racksForPartition.get(tp);
+if (tpRacks == null || tpRacks.isEmpty()) {
+throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+if (!tpRacks.contains(clientRack)) {
+cost += trafficCost;
+}
+}
+
+if (!inCurrentAssignment) {
+cost += nonOverlapCost;
+}
+
+return cost;
+}
+
+private static int getSinkID(final List clientList, final 
List taskIdList) {
+return clientList.size() + taskIdList.size();
+}
+
+// For testing. canEnableRackAwareAssignor must be called first
+long activeTasksCost(final SortedMap clientStates, 
final SortedSet activeTasks, final int trafficCost, final int 
nonOverlapCost) {

Review Comment:
   Can we add JavaDocs? It's a little unclear what this method does. Also maybe 
move `activeTasks` as first parameter, as they are the main input (all others 
are metadata)?
   ```
   /*
   Compute the cost for the provided {@code activeTasks}. The passed in active 
tasks must be contained in {@code clientState}`.
   */
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -74,11 +81,7 @@ public synchronized boolean 
canEnableRackAwareAssignorForActiveTasks() {
 }
 
 return validateTopicPartitionRack();
-}
-
-public boolean canEnableRackAwareAssignorForStandbyTasks() {
-// TODO
-return false;
+// TODO: add changelog topic, standby task validation
 }
 
 // Visible for testing. This method also checks if all TopicPartitions 
exist in cluster

Review Comment:
   Typo one line below: `D[e]scribe`



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -69,12 +71,38 @@ public final class AssignmentTestUtils {
 public static final UUID UUID_8 = uuidForInt(8);
 public static final UUID UUID_9 = uuidForInt(9);
 
-public static final TopicPartition TP_0_0 = new TopicPartition("topic0", 
0);
-public static final TopicPartition TP_0_1 = new TopicPartition("topic0", 
1);
-public static final TopicPartition TP_0_2 = new TopicPartition("topic0", 
2);
-public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 
0);
-public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 
1);
-public static final Topic

[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks

2023-07-24 Thread via GitHub


mjsax commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1272606193


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -185,4 +193,212 @@ public boolean validateClientRack() {
 }
 return true;
 }
+
+private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+final Map> clientRacks = 
racksForProcess.get(clientId);
+if (clientRacks == null) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+}
+final Optional> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {

Review Comment:
   We `filter()` already for `isPresent` -- seems we only seen the first check?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -185,4 +193,212 @@ public boolean validateClientRack() {
 }
 return true;
 }
+
+private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+final Map> clientRacks = 
racksForProcess.get(clientId);
+if (clientRacks == null) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+}
+final Optional> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+
+final String clientRack = clientRackOpt.get().get();
+final Set topicPartitions = 
partitionsForTask.get(taskId);
+if (topicPartitions == null || topicPartitions.isEmpty()) {
+throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+}
+
+final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+: assignmentConfigs.trafficCost;
+final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+: assignmentConfigs.nonOverlapCost;
+
+int cost = 0;
+for (final TopicPartition tp : topicPartitions) {
+final Set tpRacks = racksForPartition.get(tp);
+if (tpRacks == null || tpRacks.isEmpty()) {
+throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+if (!tpRacks.contains(clientRack)) {
+cost += trafficCost;
+}
+}
+
+if (!inCurrentAssignment) {
+cost += nonOverlapCost;
+}
+
+return cost;
+}
+
+private static int getSinkID(final List clientList, final 
List taskIdList) {
+return clientList.size() + taskIdList.size();
+}
+
+// For testing. canEnableRackAwareAssignor must be called first
+long activeTasksCost(final SortedMap clientStates, 
final SortedSet activeTasks, final boolean isStateful) {
+final List clientList = new ArrayList<>(clientStates.keySet());
+final List taskIdList = new ArrayList<>(activeTasks);
+final Map taskClientMap = new HashMap<>();
+final Map originalAssignedTaskNumber = new HashMap<>();
+final Graph graph = constructActiveTaskGraph(activeTasks, 
clientList, taskIdList,
+clientStates, taskClientMap, originalAssignedTaskNumber, 
isStateful);

Review Comment:
   Given that we don't read `taskClientMap` nor `originalAssignedTaskNumber`, 
should we just pass `new HashMap` directly?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -185,4 +193,212 @@ public boolean validateClientRack() {
 }
 return true;
 }
+
+private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+final Map> clientRacks = 
racksForProcess.get(clientId);
+if (clientRacks == null) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+}
+final Optional> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+throw new IllegalStateException("Client " + clientId + " does

[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks

2023-07-21 Thread via GitHub


mjsax commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1271152663


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
 }
 return true;
 }
+
+private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+final Map> clientRacks = 
racksForProcess.get(clientId);
+if (clientRacks == null) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+}
+final Optional> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+
+final String clientRack = clientRackOpt.get().get();
+final Set topicPartitions = 
partitionsForTask.get(taskId);
+if (topicPartitions == null) {
+throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+}
+
+final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+: assignmentConfigs.trafficCost;
+final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+: assignmentConfigs.nonOverlapCost;
+
+int cost = 0;
+for (final TopicPartition tp : topicPartitions) {
+final Set tpRacks = racksForPartition.get(tp);
+if (tpRacks == null || tpRacks.isEmpty()) {
+throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+if (!tpRacks.contains(clientRack)) {
+cost += trafficCost;
+}
+}
+
+if (!inCurrentAssignment) {
+cost += nonOverlapCost;
+}
+
+return cost;
+}
+
+// For testing. canEnableRackAwareAssignor must be called first
+long activeTasksCost(final SortedMap clientStates, 
final SortedSet statefulTasks, final boolean isStateful) {
+final List clientList = new ArrayList<>(clientStates.keySet());
+final List taskIdList = new ArrayList<>(statefulTasks);
+final Map taskClientMap = new HashMap<>();
+final Map clientCapacity = new HashMap<>();
+final Graph graph = new Graph<>();
+
+constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, 
taskIdList,
+clientStates, taskClientMap, clientCapacity, isStateful);
+
+final int sourceId = taskIdList.size() + clientList.size();
+final int sinkId = sourceId + 1;
+for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) 
{
+graph.addEdge(sourceId, taskNodeId, 1, 0, 1);
+}
+for (int i = 0; i < clientList.size(); i++) {
+final int capacity = 
clientCapacity.getOrDefault(clientList.get(i), 0);
+final int clientNodeId = taskIdList.size() + i;
+graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity);
+}
+graph.setSourceNode(sourceId);
+graph.setSinkNode(sinkId);
+return graph.totalCost();
+}
+
+/**
+ * Optimize active stateful task assignment for rack awareness. 
canEnableRackAwareAssignor must be called first
+ * @param clientStates Client states
+ * @param taskIds Tasks to reassign if needed. They must be assigned 
already in clientStates
+ * @param isStateful Whether the tasks are stateful
+ * @return Total cost after optimization
+ */
+public long optimizeActiveTasks(final SortedMap 
clientStates,
+final SortedSet taskIds,
+final boolean isStateful) {
+if (taskIds.isEmpty()) {
+return 0;
+}
+
+final List clientList = new ArrayList<>(clientStates.keySet());

Review Comment:
   Well, if it's useful for `constructStatefulActiveTaskGraph` to have such a 
list, we should construct this list inside `constructStatefulActiveTaskGraph` 
but not pass it in? Otherwise, we leak an optimization from 
`constructStatefulActiveTaskGraph` to the caller what seems not ideal?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.

[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks

2023-07-21 Thread via GitHub


mjsax commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1271152663


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
 }
 return true;
 }
+
+private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+final Map> clientRacks = 
racksForProcess.get(clientId);
+if (clientRacks == null) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+}
+final Optional> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+
+final String clientRack = clientRackOpt.get().get();
+final Set topicPartitions = 
partitionsForTask.get(taskId);
+if (topicPartitions == null) {
+throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+}
+
+final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+: assignmentConfigs.trafficCost;
+final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+: assignmentConfigs.nonOverlapCost;
+
+int cost = 0;
+for (final TopicPartition tp : topicPartitions) {
+final Set tpRacks = racksForPartition.get(tp);
+if (tpRacks == null || tpRacks.isEmpty()) {
+throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+if (!tpRacks.contains(clientRack)) {
+cost += trafficCost;
+}
+}
+
+if (!inCurrentAssignment) {
+cost += nonOverlapCost;
+}
+
+return cost;
+}
+
+// For testing. canEnableRackAwareAssignor must be called first
+long activeTasksCost(final SortedMap clientStates, 
final SortedSet statefulTasks, final boolean isStateful) {
+final List clientList = new ArrayList<>(clientStates.keySet());
+final List taskIdList = new ArrayList<>(statefulTasks);
+final Map taskClientMap = new HashMap<>();
+final Map clientCapacity = new HashMap<>();
+final Graph graph = new Graph<>();
+
+constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, 
taskIdList,
+clientStates, taskClientMap, clientCapacity, isStateful);
+
+final int sourceId = taskIdList.size() + clientList.size();
+final int sinkId = sourceId + 1;
+for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) 
{
+graph.addEdge(sourceId, taskNodeId, 1, 0, 1);
+}
+for (int i = 0; i < clientList.size(); i++) {
+final int capacity = 
clientCapacity.getOrDefault(clientList.get(i), 0);
+final int clientNodeId = taskIdList.size() + i;
+graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity);
+}
+graph.setSourceNode(sourceId);
+graph.setSinkNode(sinkId);
+return graph.totalCost();
+}
+
+/**
+ * Optimize active stateful task assignment for rack awareness. 
canEnableRackAwareAssignor must be called first
+ * @param clientStates Client states
+ * @param taskIds Tasks to reassign if needed. They must be assigned 
already in clientStates
+ * @param isStateful Whether the tasks are stateful
+ * @return Total cost after optimization
+ */
+public long optimizeActiveTasks(final SortedMap 
clientStates,
+final SortedSet taskIds,
+final boolean isStateful) {
+if (taskIds.isEmpty()) {
+return 0;
+}
+
+final List clientList = new ArrayList<>(clientStates.keySet());

Review Comment:
   Well, if it's useful for `constructStatefulActiveTaskGraph` to have such a 
list, we should construct this list inside ` constructStatefulActiveTaskGraph` 
but not pass it in? Otherwise, we leak an optimization from ` 
constructStatefulActiveTaskGraph` to the caller what seems not ideal?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apach

[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks

2023-07-21 Thread via GitHub


mjsax commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1271151967


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
 }
 return true;
 }
+
+private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+final Map> clientRacks = 
racksForProcess.get(clientId);
+if (clientRacks == null) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+}
+final Optional> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+
+final String clientRack = clientRackOpt.get().get();
+final Set topicPartitions = 
partitionsForTask.get(taskId);
+if (topicPartitions == null) {
+throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+}
+
+final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+: assignmentConfigs.trafficCost;
+final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+: assignmentConfigs.nonOverlapCost;
+
+int cost = 0;
+for (final TopicPartition tp : topicPartitions) {
+final Set tpRacks = racksForPartition.get(tp);
+if (tpRacks == null || tpRacks.isEmpty()) {
+throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+if (!tpRacks.contains(clientRack)) {
+cost += trafficCost;
+}
+}
+
+if (!inCurrentAssignment) {
+cost += nonOverlapCost;
+}
+
+return cost;
+}
+
+// For testing. canEnableRackAwareAssignor must be called first
+long activeTasksCost(final SortedMap clientStates, 
final SortedSet statefulTasks, final boolean isStateful) {
+final List clientList = new ArrayList<>(clientStates.keySet());
+final List taskIdList = new ArrayList<>(statefulTasks);
+final Map taskClientMap = new HashMap<>();
+final Map clientCapacity = new HashMap<>();
+final Graph graph = new Graph<>();
+
+constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, 
taskIdList,
+clientStates, taskClientMap, clientCapacity, isStateful);
+
+final int sourceId = taskIdList.size() + clientList.size();
+final int sinkId = sourceId + 1;
+for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) 
{
+graph.addEdge(sourceId, taskNodeId, 1, 0, 1);
+}
+for (int i = 0; i < clientList.size(); i++) {
+final int capacity = 
clientCapacity.getOrDefault(clientList.get(i), 0);
+final int clientNodeId = taskIdList.size() + i;
+graph.addEdge(clientNodeId, sinkId, capacity, 0, capacity);
+}
+graph.setSourceNode(sourceId);
+graph.setSinkNode(sinkId);
+return graph.totalCost();
+}
+
+/**
+ * Optimize active stateful task assignment for rack awareness. 
canEnableRackAwareAssignor must be called first
+ * @param clientStates Client states
+ * @param taskIds Tasks to reassign if needed. They must be assigned 
already in clientStates
+ * @param isStateful Whether the tasks are stateful
+ * @return Total cost after optimization
+ */
+public long optimizeActiveTasks(final SortedMap 
clientStates,
+final SortedSet taskIds,
+final boolean isStateful) {
+if (taskIds.isEmpty()) {
+return 0;
+}
+
+final List clientList = new ArrayList<>(clientStates.keySet());
+final List taskIdList = new ArrayList<>(taskIds);
+final Map taskClientMap = new HashMap<>();
+final Map clientCapacity = new HashMap<>();
+final Graph graph = new Graph<>();
+
+constructStatefulActiveTaskGraph(graph, taskIds, clientList, 
taskIdList,
+clientStates, taskClientMap, clientCapacity, isStateful);
+
+graph.solveMinCostFlow();
+final long cost = graph.totalCost();
+
+assignStatefulActiveTaskFromMinCostFlow(graph, taskIds, clientList, 
taskIdList,
+clientStates, clientCapacity, taskClientMap);
+
+return cost;

[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks

2023-07-21 Thread via GitHub


mjsax commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1270106140


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
 }
 return true;
 }
+
+private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+final Map> clientRacks = 
racksForProcess.get(clientId);
+if (clientRacks == null) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+}
+final Optional> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+
+final String clientRack = clientRackOpt.get().get();
+final Set topicPartitions = 
partitionsForTask.get(taskId);
+if (topicPartitions == null) {
+throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+}
+
+final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+: assignmentConfigs.trafficCost;
+final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+: assignmentConfigs.nonOverlapCost;
+
+int cost = 0;
+for (final TopicPartition tp : topicPartitions) {
+final Set tpRacks = racksForPartition.get(tp);
+if (tpRacks == null || tpRacks.isEmpty()) {
+throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+if (!tpRacks.contains(clientRack)) {
+cost += trafficCost;
+}
+}
+
+if (!inCurrentAssignment) {
+cost += nonOverlapCost;
+}
+
+return cost;
+}
+
+// For testing. canEnableRackAwareAssignor must be called first
+long activeTasksCost(final SortedMap clientStates, 
final SortedSet statefulTasks, final boolean isStateful) {
+final List clientList = new ArrayList<>(clientStates.keySet());
+final List taskIdList = new ArrayList<>(statefulTasks);
+final Map taskClientMap = new HashMap<>();
+final Map clientCapacity = new HashMap<>();
+final Graph graph = new Graph<>();
+
+constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, 
taskIdList,
+clientStates, taskClientMap, clientCapacity, isStateful);
+
+final int sourceId = taskIdList.size() + clientList.size();
+final int sinkId = sourceId + 1;

Review Comment:
   Looking into `constructStatefulActiveTaskGraph` it does the same thing. 
Sounds error prone. Should we not get source and sink ID from `graph` instead 
of re-computing it? (Seems we do the same thing on two places)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14030: KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks

2023-07-20 Thread via GitHub


mjsax commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1270094068


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -38,29 +43,34 @@
 
 public class RackAwareTaskAssignor {
 private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
+private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;

Review Comment:
   Why is this not zero (as it is for stateless)?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -38,29 +43,34 @@
 
 public class RackAwareTaskAssignor {
 private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;

Review Comment:
   Why is this set to `10`? In particular, why is it higher than for the 
stateless case? In the end, my understanding was that we try to optimize for 
input partitions, and for this case, there is no difference if a task has state 
or not, but only the number of input topic partitions for a task matter (each 
with equal cost)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java:
##
@@ -268,24 +268,44 @@ public static class AssignmentConfigs {
 public final long probingRebalanceIntervalMs;
 public final List rackAwareAssignmentTags;
 
+// TODO: get from streamsConfig after we add the config

Review Comment:
   I cannot remember such parameters being defined in the KIP. Can you 
elaborate?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
 }
 return true;
 }
+
+private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+final Map> clientRacks = 
racksForProcess.get(clientId);
+if (clientRacks == null) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+}
+final Optional> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+
+final String clientRack = clientRackOpt.get().get();
+final Set topicPartitions = 
partitionsForTask.get(taskId);
+if (topicPartitions == null) {
+throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+}
+
+final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+: assignmentConfigs.trafficCost;
+final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+: assignmentConfigs.nonOverlapCost;
+
+int cost = 0;
+for (final TopicPartition tp : topicPartitions) {
+final Set tpRacks = racksForPartition.get(tp);
+if (tpRacks == null || tpRacks.isEmpty()) {
+throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+}
+if (!tpRacks.contains(clientRack)) {
+cost += trafficCost;
+}
+}
+
+if (!inCurrentAssignment) {
+cost += nonOverlapCost;
+}
+
+return cost;
+}
+
+// For testing. canEnableRackAwareAssignor must be called first
+long activeTasksCost(final SortedMap clientStates, 
final SortedSet statefulTasks, final boolean isStateful) {
+final List clientList = new ArrayList<>(clientStates.keySet());
+final List taskIdList = new ArrayList<>(statefulTasks);
+final Map taskClientMap = new HashMap<>();
+final Map clientCapacity = new HashMap<>();
+final Graph graph = new Graph<>();
+
+constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, 
taskIdList,
+clientStates, taskClientMap, clientCapacity, isStateful);
+
+final int sourceId = taskIdList.size() + clientList.size();
+final int sinkId = sourceId + 1;
+for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) 
{
+graph.addEdge(sourceId, taskNodeId, 1, 0, 1);
+}
+for (int i = 0; i < clientList.size(); i++) {
+final int capacity = 
clie