ableegoldman commented on code in PR #16033:
URL: https://github.com/apache/kafka/pull/16033#discussion_r1618282249


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -74,20 +209,240 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
         final ApplicationState applicationState,
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
     ) {
+        if (!hasValidRackInformation(applicationState)) {
+            throw new IllegalStateException("Cannot perform rack-aware 
assignment optimizations with invalid rack information.");
+        }
+
+        final int crossRackTrafficCost= 
applicationState.assignmentConfigs().rackAwareTrafficCost();
+        final int nonOverlapCost = 
applicationState.assignmentConfigs().rackAwareNonOverlapCost();
+        final long currentCost = computeTaskCost(
+            applicationState.allTasks(),
+            applicationState.kafkaStreamsStates(false),
+            crossRackTrafficCost,
+            nonOverlapCost,
+            true,
+            true
+        );
+        LOG.info("Assignment before standby task optimization has cost {}", 
currentCost);
         throw new UnsupportedOperationException("Not Implemented.");
     }
 
+    private static long computeTaskCost(final Set<TaskInfo> tasks,
+                                        final Map<ProcessId, 
KafkaStreamsState> clients,
+                                        final int crossRackTrafficCost,
+                                        final int nonOverlapCost,
+                                        final boolean hasReplica,
+                                        final boolean isStandby) {
+        if (tasks.isEmpty()) {
+            return 0;
+        }
+
+        final List<UUID> clientIds = 
clients.keySet().stream().map(ProcessId::id).collect(
+            Collectors.toList());
+
+        final List<TaskId> taskIds = 
tasks.stream().map(TaskInfo::id).collect(Collectors.toList());
+        final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId = 
tasks.stream().collect(
+            Collectors.toMap(TaskInfo::id, TaskInfo::topicPartitions));
+
+        final Map<UUID, Optional<String>> clientRacks = 
clients.values().stream().collect(
+            Collectors.toMap(state -> state.processId().id(), 
KafkaStreamsState::rackId));
+
+        final Map<UUID, Set<TaskId>> taskIdsByProcess = 
clients.values().stream().collect(
+            Collectors.toMap(state -> state.processId().id(), state -> {
+                if (isStandby) {
+                    return state.previousStandbyTasks();
+                }
+                return state.previousActiveTasks();
+            })
+        );
+
+        final RackAwareGraphConstructor<UUID> graphConstructor = new 
MinTrafficGraphConstructor<>();
+        final AssignmentGraph assignmentGraph = buildTaskGraph(clientIds, 
clientRacks, taskIds, taskIdsByProcess, topicPartitionsByTaskId,
+            crossRackTrafficCost, nonOverlapCost, hasReplica, isStandby, 
graphConstructor);
+        return assignmentGraph.graph.totalCost();
+    }
+
+    private static AssignmentGraph buildTaskGraph(final List<UUID> clientIds,
+                                                 final Map<UUID, 
Optional<String>> clientRacks,
+                                                 final List<TaskId> taskIds,
+                                                 final Map<UUID, Set<TaskId>> 
taskIdsByProcess,

Review Comment:
   I just realized that the wording of my comment here was potentially 
misleading. Sorry about that. What I meant by this
   
   > However both callers of this method construct the tasksIdsByProcess map 
based on the previous task assignment, not the current one.
   
   was not that the variable name should be updated to 
`previousTaskIdsByProcess`, but rather the contents of the map passed in by the 
callers should be updated to pass in the current assignment, not the previous 
assignment. Sorry for all the confusion around this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to