mjsax commented on code in PR #14150: URL: https://github.com/apache/kafka/pull/14150#discussion_r1284793025
########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface."; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + + /** {@code } rack.aware.assignment.strategy */ + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" + + " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC + + "</code>, which will compute minimum cross rack traffic assignment"; Review Comment: ```suggestion + "</code>, which will compute minimum cross rack traffic assignment."; ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface."; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + + /** {@code } rack.aware.assignment.strategy */ + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" Review Comment: ```suggestion public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take <code>client.rack</code> and <code>racks</code> of <code>TopicPartition</code> into account when assigning" ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface."; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + + /** {@code } rack.aware.assignment.strategy */ + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" + + " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC + + "</code>, which will compute minimum cross rack traffic assignment"; + + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost"; + public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and <code>rack.aware.assignment.non_overlap_cost</code> controls whether the " + + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " + + "optimize minimizing cross rack traffic"; Review Comment: ```suggestion + "optimize for minimizing cross rack traffic."; ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface."; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + + /** {@code } rack.aware.assignment.strategy */ + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" + + " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC + + "</code>, which will compute minimum cross rack traffic assignment"; + + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost"; + public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and <code>rack.aware.assignment.non_overlap_cost</code> controls whether the " + + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " + + "optimize minimizing cross rack traffic"; + + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG = "rack.aware.assignment.non_overlap_cost"; + public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC = "Cost associated with moving tasks from existing assignment. This config and <code>" + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG + "</code> controls whether the " + + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " + + "optimize to maintain existing assignment"; Review Comment: ```suggestion + "optimize to maintain the existing assignment."; ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface."; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + + /** {@code } rack.aware.assignment.strategy */ + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" + + " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC + + "</code>, which will compute minimum cross rack traffic assignment"; + + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost"; + public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and <code>rack.aware.assignment.non_overlap_cost</code> controls whether the " + + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " + + "optimize minimizing cross rack traffic"; + + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG = "rack.aware.assignment.non_overlap_cost"; + public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC = "Cost associated with moving tasks from existing assignment. This config and <code>" + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG + "</code> controls whether the " + + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " Review Comment: ```suggestion + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "<code/> will " ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -62,6 +63,7 @@ boolean canMove(final ClientState source, private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class); private static final int SOURCE_ID = -1; + private static final int STANDBY_OPTIMIZER_MAX_ITERATION = 4; Review Comment: Add a comment that we picked 4 based on some testing, but it's still more or less random. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -340,6 +338,9 @@ public long optimizeActiveTasks(final SortedSet<TaskId> activeTasks, return 0; } + log.info("Assignment before active optimization is {}\n with cost {}", clientStates, Review Comment: ```suggestion log.info("Assignment before active task optimization is {}\n with cost {}", clientStates, ``` ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -1378,6 +1380,48 @@ public void shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() { assertEquals(0, configs.size()); } + @Test + public void shouldReturnDefaultRackAwareAssignmentConfig() { + final String strategy = streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); + assertEquals("NONE", strategy); + } + + @Test + public void shouldtSetMinTrafficRackAwareAssignmentConfig() { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC); + assertEquals("MIN_TRAFFIC", new StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); Review Comment: It seems to be invalid to enable the assignor without setting traffic and movement costs? ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface."; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + + /** {@code } rack.aware.assignment.strategy */ + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" + + " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC + + "</code>, which will compute minimum cross rack traffic assignment"; + + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost"; + public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and <code>rack.aware.assignment.non_overlap_cost</code> controls whether the " + + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " Review Comment: ```suggestion + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will " ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface."; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + + /** {@code } rack.aware.assignment.strategy */ + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" + + " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC Review Comment: ```suggestion + " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code> (default), which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java: ########## @@ -267,25 +267,46 @@ public static class AssignmentConfigs { public final int numStandbyReplicas; public final long probingRebalanceIntervalMs; public final List<String> rackAwareAssignmentTags; + public final Integer rackAwareAssignmentTrafficCost; Review Comment: Why are we using `Integer` but not `int` ? ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) + .define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, + Type.STRING, + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, + in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), + Importance.MEDIUM, + RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) + .define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, Review Comment: `TRAFFIC` should be after `TAGS` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) + .define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, + Type.STRING, + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, + in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), + Importance.MEDIUM, + RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) + .define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, + Type.INT, + null, + Importance.MEDIUM, + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC) + .define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, Review Comment: Should be before `RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -368,46 +370,63 @@ public long optimizeStandbyTasks(final SortedMap<UUID, ClientState> clientStates final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); final SortedSet<TaskId> standbyTasks = new TreeSet<>(); - for (int i = 0; i < clientList.size(); i++) { - final ClientState clientState1 = clientStates.get(clientList.get(i)); - standbyTasks.addAll(clientState1.standbyTasks()); - for (int j = i + 1; j < clientList.size(); j++) { - final ClientState clientState2 = clientStates.get(clientList.get(j)); - - final String rack1 = racksForProcess.get(clientState1.processId()); - final String rack2 = racksForProcess.get(clientState2.processId()); - // Cross rack traffic can not be reduced if racks are the same - if (rack1.equals(rack2)) { - continue; - } - - final List<TaskId> movable1 = getMovableTasks.apply(clientState1, clientState2); - final List<TaskId> movable2 = getMovableTasks.apply(clientState2, clientState1); - - // There's no needed to optimize if one is empty because the optimization - // can only swap tasks to keep the client's load balanced - if (movable1.isEmpty() || movable2.isEmpty()) { - continue; - } - - final List<TaskId> taskIdList = Stream.concat(movable1.stream(), movable2.stream()) - .sorted() - .collect(Collectors.toList()); + clientStates.values().forEach(clientState -> standbyTasks.addAll(clientState.standbyTasks())); + + log.info("Assignment before standby optimization is {}\n with cost {}", clientStates, Review Comment: ```suggestion log.info("Assignment before standby task optimization is {}\n with cost {}", clientStates, ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -353,6 +354,7 @@ public long optimizeActiveTasks(final SortedSet<TaskId> activeTasks, assignTaskFromMinCostFlow(graph, clientList, taskIdList, clientStates, originalAssignedTaskNumber, taskClientMap, ClientState::assignActive, ClientState::unassignActive, ClientState::hasActiveTask); + log.info("Assignment after active optimization is {}\n with cost {}", clientStates, cost); Review Comment: ```suggestion log.info("Assignment after active task optimization is {}\n with cost {}", clientStates, cost); ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) + .define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, + Type.STRING, + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, + in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), + Importance.MEDIUM, + RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) + .define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, + Type.INT, + null, Review Comment: Should we have any default value? If not, how can we give guidance to users how to set it and document it? If not, it would be very difficult for users to set. Same for non-overlap cost below. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -368,46 +370,63 @@ public long optimizeStandbyTasks(final SortedMap<UUID, ClientState> clientStates final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); final SortedSet<TaskId> standbyTasks = new TreeSet<>(); - for (int i = 0; i < clientList.size(); i++) { - final ClientState clientState1 = clientStates.get(clientList.get(i)); - standbyTasks.addAll(clientState1.standbyTasks()); - for (int j = i + 1; j < clientList.size(); j++) { - final ClientState clientState2 = clientStates.get(clientList.get(j)); - - final String rack1 = racksForProcess.get(clientState1.processId()); - final String rack2 = racksForProcess.get(clientState2.processId()); - // Cross rack traffic can not be reduced if racks are the same - if (rack1.equals(rack2)) { - continue; - } - - final List<TaskId> movable1 = getMovableTasks.apply(clientState1, clientState2); - final List<TaskId> movable2 = getMovableTasks.apply(clientState2, clientState1); - - // There's no needed to optimize if one is empty because the optimization - // can only swap tasks to keep the client's load balanced - if (movable1.isEmpty() || movable2.isEmpty()) { - continue; - } - - final List<TaskId> taskIdList = Stream.concat(movable1.stream(), movable2.stream()) - .sorted() - .collect(Collectors.toList()); + clientStates.values().forEach(clientState -> standbyTasks.addAll(clientState.standbyTasks())); + + log.info("Assignment before standby optimization is {}\n with cost {}", clientStates, + standByTasksCost(standbyTasks, clientStates, trafficCost, nonOverlapCost)); + + boolean taskMoved = true; + int round = 0; + while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) { + taskMoved = false; + round++; + for (int i = 0; i < clientList.size(); i++) { + final ClientState clientState1 = clientStates.get(clientList.get(i)); + for (int j = i + 1; j < clientList.size(); j++) { + final ClientState clientState2 = clientStates.get(clientList.get(j)); + + final String rack1 = racksForProcess.get(clientState1.processId()); + final String rack2 = racksForProcess.get(clientState2.processId()); + // Cross rack traffic can not be reduced if racks are the same + if (rack1.equals(rack2)) { + continue; + } - final Map<TaskId, UUID> taskClientMap = new HashMap<>(); - final List<UUID> clients = Stream.of(clientList.get(i), clientList.get(j)).sorted().collect( - Collectors.toList()); - final Map<UUID, Integer> originalAssignedTaskNumber = new HashMap<>(); + final List<TaskId> movable1 = getMovableTasks.apply(clientState1, clientState2); + final List<TaskId> movable2 = getMovableTasks.apply(clientState2, clientState1); - final Graph<Integer> graph = constructTaskGraph(clients, taskIdList, clientStates, taskClientMap, originalAssignedTaskNumber, - ClientState::hasStandbyTask, trafficCost, nonOverlapCost, true, true); - graph.solveMinCostFlow(); + // There's no needed to optimize if one is empty because the optimization + // can only swap tasks to keep the client's load balanced + if (movable1.isEmpty() || movable2.isEmpty()) { + continue; + } - assignTaskFromMinCostFlow(graph, clients, taskIdList, clientStates, originalAssignedTaskNumber, - taskClientMap, ClientState::assignStandby, ClientState::unassignStandby, ClientState::hasStandbyTask); + final List<TaskId> taskIdList = Stream.concat(movable1.stream(), + movable2.stream()) + .sorted() + .collect(Collectors.toList()); + + final Map<TaskId, UUID> taskClientMap = new HashMap<>(); + final List<UUID> clients = Stream.of(clientList.get(i), clientList.get(j)) + .sorted().collect( + Collectors.toList()); + final Map<UUID, Integer> originalAssignedTaskNumber = new HashMap<>(); + + final Graph<Integer> graph = constructTaskGraph(clients, taskIdList, + clientStates, taskClientMap, originalAssignedTaskNumber, + ClientState::hasStandbyTask, trafficCost, nonOverlapCost, true, true); + graph.solveMinCostFlow(); + + taskMoved |= assignTaskFromMinCostFlow(graph, clients, taskIdList, clientStates, + originalAssignedTaskNumber, + taskClientMap, ClientState::assignStandby, ClientState::unassignStandby, + ClientState::hasStandbyTask); + } } } - return standByTasksCost(standbyTasks, clientStates, trafficCost, nonOverlapCost); + final long cost = standByTasksCost(standbyTasks, clientStates, trafficCost, nonOverlapCost); + log.info("Assignment after {} rounds of standby optimization is {}\n with cost {}", round, clientStates, cost); Review Comment: ```suggestion log.info("Assignment after {} rounds of standby task optimization is {}\n with cost {}", round, clientStates, cost); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org