apourchet commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1621479244
########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ########## @@ -244,18 +271,112 @@ public static Map<ProcessId, KafkaStreamsAssignment> optimizeRackAwareStandbyTas ); LOG.info("Assignment before standby task optimization has cost {}", initialCost); - throw new UnsupportedOperationException("Not yet Implemented."); + final MoveStandbyTaskPredicate moveablePredicate = getStandbyTaskMovePredicate(applicationState); + final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment, List<TaskId>> getMovableTasks = (source, destination) -> { + return source.tasks().values().stream() + .filter(task -> task.type() == AssignedTask.Type.STANDBY) + .filter(task -> !destination.tasks().containsKey(task.id())) + .filter(task -> { + final KafkaStreamsState sourceState = kafkaStreamsStates.get(source.processId()); + final KafkaStreamsState destinationState = kafkaStreamsStates.get(source.processId()); + return moveablePredicate.canMoveStandbyTask(sourceState, destinationState, task.id(), kafkaStreamsAssignments); + }) + .map(AssignedTask::id) + .sorted() + .collect(Collectors.toList()); + }; + + final long startTime = System.currentTimeMillis(); + boolean taskMoved = true; + int round = 0; + final RackAwareGraphConstructor<KafkaStreamsAssignment> graphConstructor = RackAwareGraphConstructorFactory.create( + applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds); + while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) { + taskMoved = false; + round++; + for (int i = 0; i < kafkaStreamsAssignments.size(); i++) { + final UUID clientId1 = clientIds.get(i); + final KafkaStreamsAssignment clientState1 = kafkaStreamsAssignments.get(new ProcessId(clientId1)); + for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) { + final UUID clientId2 = clientIds.get(i); + final KafkaStreamsAssignment clientState2 = kafkaStreamsAssignments.get(new ProcessId(clientId2)); + + final String rack1 = clientRacks.get(clientState1.processId().id()).get(); + final String rack2 = clientRacks.get(clientState2.processId().id()).get(); + // Cross rack traffic can not be reduced if racks are the same + if (rack1.equals(rack2)) { + continue; + } + + final List<TaskId> movable1 = getMovableTasks.apply(clientState1, clientState2); + final List<TaskId> movable2 = getMovableTasks.apply(clientState2, clientState1); + + // There's no needed to optimize if one is empty because the optimization + // can only swap tasks to keep the client's load balanced + if (movable1.isEmpty() || movable2.isEmpty()) { + continue; + } + + final List<TaskId> taskIdList = Stream.concat(movable1.stream(), movable2.stream()) + .sorted() + .collect(Collectors.toList()); + final List<UUID> clients = Stream.of(clientId1, clientId2).sorted().collect(Collectors.toList()); + + final AssignmentGraph assignmentGraph = buildTaskGraph( + assignmentsByUuid, + clientRacks, + taskIdList, + clients, + topicPartitionsByTaskId, + crossRackTrafficCost, + nonOverlapCost, + false, + false, Review Comment: you're right, good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org