apourchet commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1621479244


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -244,18 +271,112 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
         );
         LOG.info("Assignment before standby task optimization has cost {}", 
initialCost);
 
-        throw new UnsupportedOperationException("Not yet Implemented.");
+        final MoveStandbyTaskPredicate moveablePredicate = 
getStandbyTaskMovePredicate(applicationState);
+        final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment, 
List<TaskId>> getMovableTasks = (source, destination) -> {
+            return source.tasks().values().stream()
+                .filter(task -> task.type() == AssignedTask.Type.STANDBY)
+                .filter(task -> !destination.tasks().containsKey(task.id()))
+                .filter(task -> {
+                    final KafkaStreamsState sourceState = 
kafkaStreamsStates.get(source.processId());
+                    final KafkaStreamsState destinationState = 
kafkaStreamsStates.get(source.processId());
+                    return moveablePredicate.canMoveStandbyTask(sourceState, 
destinationState, task.id(), kafkaStreamsAssignments);
+                })
+                .map(AssignedTask::id)
+                .sorted()
+                .collect(Collectors.toList());
+        };
+
+        final long startTime = System.currentTimeMillis();
+        boolean taskMoved = true;
+        int round = 0;
+        final RackAwareGraphConstructor<KafkaStreamsAssignment> 
graphConstructor = RackAwareGraphConstructorFactory.create(
+            
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
+        while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+            taskMoved = false;
+            round++;
+            for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
+                final UUID clientId1 = clientIds.get(i);
+                final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+                for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
+                    final UUID clientId2 = clientIds.get(i);
+                    final KafkaStreamsAssignment clientState2 = 
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+
+                    final String rack1 = 
clientRacks.get(clientState1.processId().id()).get();
+                    final String rack2 = 
clientRacks.get(clientState2.processId().id()).get();
+                    // Cross rack traffic can not be reduced if racks are the 
same
+                    if (rack1.equals(rack2)) {
+                        continue;
+                    }
+
+                    final List<TaskId> movable1 = 
getMovableTasks.apply(clientState1, clientState2);
+                    final List<TaskId> movable2 = 
getMovableTasks.apply(clientState2, clientState1);
+
+                    // There's no needed to optimize if one is empty because 
the optimization
+                    // can only swap tasks to keep the client's load balanced
+                    if (movable1.isEmpty() || movable2.isEmpty()) {
+                        continue;
+                    }
+
+                    final List<TaskId> taskIdList = 
Stream.concat(movable1.stream(), movable2.stream())
+                        .sorted()
+                        .collect(Collectors.toList());
+                    final List<UUID> clients = Stream.of(clientId1, 
clientId2).sorted().collect(Collectors.toList());
+
+                    final AssignmentGraph assignmentGraph = buildTaskGraph(
+                        assignmentsByUuid,
+                        clientRacks,
+                        taskIdList,
+                        clients,
+                        topicPartitionsByTaskId,
+                        crossRackTrafficCost,
+                        nonOverlapCost,
+                        false,
+                        false,

Review Comment:
   you're right, good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to