mjsax commented on code in PR #14150:
URL: https://github.com/apache/kafka/pull/14150#discussion_r1284793025


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
     public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
<code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
 
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+    /** {@code } rack.aware.assignment.strategy */
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"
+        + " tasks to minimize cross rack traffic. Valid settings are : <code>" 
+ RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware 
assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
+        + "</code>, which will compute minimum cross rack traffic assignment";

Review Comment:
   ```suggestion
           + "</code>, which will compute minimum cross rack traffic 
assignment.";
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
     public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
<code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
 
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+    /** {@code } rack.aware.assignment.strategy */
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"

Review Comment:
   ```suggestion
       public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
<code>client.rack</code> and <code>racks</code> of <code>TopicPartition</code> 
into account when assigning"
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
     public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
<code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
 
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+    /** {@code } rack.aware.assignment.strategy */
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"
+        + " tasks to minimize cross rack traffic. Valid settings are : <code>" 
+ RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware 
assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
+        + "</code>, which will compute minimum cross rack traffic assignment";
+
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = 
"rack.aware.assignment.traffic_cost";
+    public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost 
associated with cross rack traffic. This config and 
<code>rack.aware.assignment.non_overlap_cost</code> controls whether the "
+        + "optimization algorithm favors minimizing cross rack traffic or 
minimize the movement of tasks in existing assignment. If set a larger value " 
+ RackAwareTaskAssignor.class.getName() + " will "
+        + "optimize minimizing cross rack traffic";

Review Comment:
   ```suggestion
           + "optimize for minimizing cross rack traffic.";
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
     public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
<code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
 
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+    /** {@code } rack.aware.assignment.strategy */
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"
+        + " tasks to minimize cross rack traffic. Valid settings are : <code>" 
+ RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware 
assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
+        + "</code>, which will compute minimum cross rack traffic assignment";
+
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = 
"rack.aware.assignment.traffic_cost";
+    public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost 
associated with cross rack traffic. This config and 
<code>rack.aware.assignment.non_overlap_cost</code> controls whether the "
+        + "optimization algorithm favors minimizing cross rack traffic or 
minimize the movement of tasks in existing assignment. If set a larger value " 
+ RackAwareTaskAssignor.class.getName() + " will "
+        + "optimize minimizing cross rack traffic";
+
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG = 
"rack.aware.assignment.non_overlap_cost";
+    public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC = 
"Cost associated with moving tasks from existing assignment. This config and 
<code>" + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG + "</code> controls whether 
the "
+        + "optimization algorithm favors minimizing cross rack traffic or 
minimize the movement of tasks in existing assignment. If set a larger value " 
+ RackAwareTaskAssignor.class.getName() + " will "
+        + "optimize to maintain existing assignment";

Review Comment:
   ```suggestion
           + "optimize to maintain the existing assignment.";
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
     public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
<code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
 
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+    /** {@code } rack.aware.assignment.strategy */
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"
+        + " tasks to minimize cross rack traffic. Valid settings are : <code>" 
+ RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware 
assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
+        + "</code>, which will compute minimum cross rack traffic assignment";
+
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = 
"rack.aware.assignment.traffic_cost";
+    public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost 
associated with cross rack traffic. This config and 
<code>rack.aware.assignment.non_overlap_cost</code> controls whether the "
+        + "optimization algorithm favors minimizing cross rack traffic or 
minimize the movement of tasks in existing assignment. If set a larger value " 
+ RackAwareTaskAssignor.class.getName() + " will "
+        + "optimize minimizing cross rack traffic";
+
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG = 
"rack.aware.assignment.non_overlap_cost";
+    public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC = 
"Cost associated with moving tasks from existing assignment. This config and 
<code>" + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG + "</code> controls whether 
the "
+        + "optimization algorithm favors minimizing cross rack traffic or 
minimize the movement of tasks in existing assignment. If set a larger value " 
+ RackAwareTaskAssignor.class.getName() + " will "

Review Comment:
   ```suggestion
           + "optimization algorithm favors minimizing cross rack traffic or 
minimize the movement of tasks in existing assignment. If set a larger value 
<code>" + RackAwareTaskAssignor.class.getName() + "<code/> will "
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -62,6 +63,7 @@ boolean canMove(final ClientState source,
     private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
 
     private static final int SOURCE_ID = -1;
+    private static final int STANDBY_OPTIMIZER_MAX_ITERATION = 4;

Review Comment:
   Add a comment that we picked 4 based on some testing, but it's still more or 
less random.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -340,6 +338,9 @@ public long optimizeActiveTasks(final SortedSet<TaskId> 
activeTasks,
             return 0;
         }
 
+        log.info("Assignment before active optimization is {}\n with cost {}", 
clientStates,

Review Comment:
   ```suggestion
           log.info("Assignment before active task optimization is {}\n with 
cost {}", clientStates,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1378,6 +1380,48 @@ public void 
shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() {
         assertEquals(0, configs.size());
     }
 
+    @Test
+    public void shouldReturnDefaultRackAwareAssignmentConfig() {
+        final String strategy = 
streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+        assertEquals("NONE", strategy);
+    }
+
+    @Test
+    public void shouldtSetMinTrafficRackAwareAssignmentConfig() {
+        props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC);
+        assertEquals("MIN_TRAFFIC", new 
StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));

Review Comment:
   It seems to be invalid to enable the assignor without setting traffic and 
movement costs?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
     public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
<code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
 
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+    /** {@code } rack.aware.assignment.strategy */
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"
+        + " tasks to minimize cross rack traffic. Valid settings are : <code>" 
+ RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware 
assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
+        + "</code>, which will compute minimum cross rack traffic assignment";
+
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = 
"rack.aware.assignment.traffic_cost";
+    public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost 
associated with cross rack traffic. This config and 
<code>rack.aware.assignment.non_overlap_cost</code> controls whether the "
+        + "optimization algorithm favors minimizing cross rack traffic or 
minimize the movement of tasks in existing assignment. If set a larger value " 
+ RackAwareTaskAssignor.class.getName() + " will "

Review Comment:
   ```suggestion
           + "optimization algorithm favors minimizing cross rack traffic or 
minimize the movement of tasks in existing assignment. If set a larger value 
<code>" + RackAwareTaskAssignor.class.getName() + "</code> will "
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
     public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
<code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
 
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+    /** {@code } rack.aware.assignment.strategy */
+    @SuppressWarnings("WeakerAccess")
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"
+        + " tasks to minimize cross rack traffic. Valid settings are : <code>" 
+ RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code>, which will disable rack aware 
assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC

Review Comment:
   ```suggestion
           + " tasks to minimize cross rack traffic. Valid settings are : 
<code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code> (default), which will 
disable rack aware assignment; <code>" + 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java:
##########
@@ -267,25 +267,46 @@ public static class AssignmentConfigs {
         public final int numStandbyReplicas;
         public final long probingRebalanceIntervalMs;
         public final List<String> rackAwareAssignmentTags;
+        public final Integer rackAwareAssignmentTrafficCost;

Review Comment:
   Why are we using `Integer` but not `int` ?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig {
                     in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
                     Importance.MEDIUM,
                     PROCESSING_GUARANTEE_DOC)
+            .define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+                    Type.STRING,
+                    RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+                    in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
+                    Importance.MEDIUM,
+                    RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
+            .define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,

Review Comment:
   `TRAFFIC` should be after `TAGS`



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig {
                     in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
                     Importance.MEDIUM,
                     PROCESSING_GUARANTEE_DOC)
+            .define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+                    Type.STRING,
+                    RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+                    in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
+                    Importance.MEDIUM,
+                    RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
+            .define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+                    Type.INT,
+                    null,
+                    Importance.MEDIUM,
+                    RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC)
+            .define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,

Review Comment:
   Should be before `RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -368,46 +370,63 @@ public long optimizeStandbyTasks(final SortedMap<UUID, 
ClientState> clientStates
 
         final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
         final SortedSet<TaskId> standbyTasks = new TreeSet<>();
-        for (int i = 0; i < clientList.size(); i++) {
-            final ClientState clientState1 = 
clientStates.get(clientList.get(i));
-            standbyTasks.addAll(clientState1.standbyTasks());
-            for (int j = i + 1; j < clientList.size(); j++) {
-                final ClientState clientState2 = 
clientStates.get(clientList.get(j));
-
-                final String rack1 = 
racksForProcess.get(clientState1.processId());
-                final String rack2 = 
racksForProcess.get(clientState2.processId());
-                // Cross rack traffic can not be reduced if racks are the same
-                if (rack1.equals(rack2)) {
-                    continue;
-                }
-
-                final List<TaskId> movable1 = 
getMovableTasks.apply(clientState1, clientState2);
-                final List<TaskId> movable2 = 
getMovableTasks.apply(clientState2, clientState1);
-
-                // There's no needed to optimize if one is empty because the 
optimization
-                // can only swap tasks to keep the client's load balanced
-                if (movable1.isEmpty() || movable2.isEmpty()) {
-                    continue;
-                }
-
-                final List<TaskId> taskIdList = 
Stream.concat(movable1.stream(), movable2.stream())
-                    .sorted()
-                    .collect(Collectors.toList());
+        clientStates.values().forEach(clientState -> 
standbyTasks.addAll(clientState.standbyTasks()));
+
+        log.info("Assignment before standby optimization is {}\n with cost 
{}", clientStates,

Review Comment:
   ```suggestion
           log.info("Assignment before standby task optimization is {}\n with 
cost {}", clientStates,
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -353,6 +354,7 @@ public long optimizeActiveTasks(final SortedSet<TaskId> 
activeTasks,
         assignTaskFromMinCostFlow(graph, clientList, taskIdList, clientStates, 
originalAssignedTaskNumber,
             taskClientMap, ClientState::assignActive, 
ClientState::unassignActive, ClientState::hasActiveTask);
 
+        log.info("Assignment after active optimization is {}\n with cost {}", 
clientStates, cost);

Review Comment:
   ```suggestion
           log.info("Assignment after active task optimization is {}\n with 
cost {}", clientStates, cost);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig {
                     in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
                     Importance.MEDIUM,
                     PROCESSING_GUARANTEE_DOC)
+            .define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+                    Type.STRING,
+                    RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+                    in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
+                    Importance.MEDIUM,
+                    RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
+            .define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+                    Type.INT,
+                    null,

Review Comment:
   Should we have any default value? If not, how can we give guidance to users 
how to set it and document it? If not, it would be very difficult for users to 
set.
   
   Same for non-overlap cost below.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -368,46 +370,63 @@ public long optimizeStandbyTasks(final SortedMap<UUID, 
ClientState> clientStates
 
         final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
         final SortedSet<TaskId> standbyTasks = new TreeSet<>();
-        for (int i = 0; i < clientList.size(); i++) {
-            final ClientState clientState1 = 
clientStates.get(clientList.get(i));
-            standbyTasks.addAll(clientState1.standbyTasks());
-            for (int j = i + 1; j < clientList.size(); j++) {
-                final ClientState clientState2 = 
clientStates.get(clientList.get(j));
-
-                final String rack1 = 
racksForProcess.get(clientState1.processId());
-                final String rack2 = 
racksForProcess.get(clientState2.processId());
-                // Cross rack traffic can not be reduced if racks are the same
-                if (rack1.equals(rack2)) {
-                    continue;
-                }
-
-                final List<TaskId> movable1 = 
getMovableTasks.apply(clientState1, clientState2);
-                final List<TaskId> movable2 = 
getMovableTasks.apply(clientState2, clientState1);
-
-                // There's no needed to optimize if one is empty because the 
optimization
-                // can only swap tasks to keep the client's load balanced
-                if (movable1.isEmpty() || movable2.isEmpty()) {
-                    continue;
-                }
-
-                final List<TaskId> taskIdList = 
Stream.concat(movable1.stream(), movable2.stream())
-                    .sorted()
-                    .collect(Collectors.toList());
+        clientStates.values().forEach(clientState -> 
standbyTasks.addAll(clientState.standbyTasks()));
+
+        log.info("Assignment before standby optimization is {}\n with cost 
{}", clientStates,
+            standByTasksCost(standbyTasks, clientStates, trafficCost, 
nonOverlapCost));
+
+        boolean taskMoved = true;
+        int round = 0;
+        while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+            taskMoved = false;
+            round++;
+            for (int i = 0; i < clientList.size(); i++) {
+                final ClientState clientState1 = 
clientStates.get(clientList.get(i));
+                for (int j = i + 1; j < clientList.size(); j++) {
+                    final ClientState clientState2 = 
clientStates.get(clientList.get(j));
+
+                    final String rack1 = 
racksForProcess.get(clientState1.processId());
+                    final String rack2 = 
racksForProcess.get(clientState2.processId());
+                    // Cross rack traffic can not be reduced if racks are the 
same
+                    if (rack1.equals(rack2)) {
+                        continue;
+                    }
 
-                final Map<TaskId, UUID> taskClientMap = new HashMap<>();
-                final List<UUID> clients = Stream.of(clientList.get(i), 
clientList.get(j)).sorted().collect(
-                    Collectors.toList());
-                final Map<UUID, Integer> originalAssignedTaskNumber = new 
HashMap<>();
+                    final List<TaskId> movable1 = 
getMovableTasks.apply(clientState1, clientState2);
+                    final List<TaskId> movable2 = 
getMovableTasks.apply(clientState2, clientState1);
 
-                final Graph<Integer> graph = constructTaskGraph(clients, 
taskIdList, clientStates, taskClientMap, originalAssignedTaskNumber,
-                    ClientState::hasStandbyTask, trafficCost, nonOverlapCost, 
true, true);
-                graph.solveMinCostFlow();
+                    // There's no needed to optimize if one is empty because 
the optimization
+                    // can only swap tasks to keep the client's load balanced
+                    if (movable1.isEmpty() || movable2.isEmpty()) {
+                        continue;
+                    }
 
-                assignTaskFromMinCostFlow(graph, clients, taskIdList, 
clientStates, originalAssignedTaskNumber,
-                    taskClientMap, ClientState::assignStandby, 
ClientState::unassignStandby, ClientState::hasStandbyTask);
+                    final List<TaskId> taskIdList = 
Stream.concat(movable1.stream(),
+                            movable2.stream())
+                        .sorted()
+                        .collect(Collectors.toList());
+
+                    final Map<TaskId, UUID> taskClientMap = new HashMap<>();
+                    final List<UUID> clients = Stream.of(clientList.get(i), 
clientList.get(j))
+                        .sorted().collect(
+                            Collectors.toList());
+                    final Map<UUID, Integer> originalAssignedTaskNumber = new 
HashMap<>();
+
+                    final Graph<Integer> graph = constructTaskGraph(clients, 
taskIdList,
+                        clientStates, taskClientMap, 
originalAssignedTaskNumber,
+                        ClientState::hasStandbyTask, trafficCost, 
nonOverlapCost, true, true);
+                    graph.solveMinCostFlow();
+
+                    taskMoved |= assignTaskFromMinCostFlow(graph, clients, 
taskIdList, clientStates,
+                        originalAssignedTaskNumber,
+                        taskClientMap, ClientState::assignStandby, 
ClientState::unassignStandby,
+                        ClientState::hasStandbyTask);
+                }
             }
         }
-        return standByTasksCost(standbyTasks, clientStates, trafficCost, 
nonOverlapCost);
+        final long cost = standByTasksCost(standbyTasks, clientStates, 
trafficCost, nonOverlapCost);
+        log.info("Assignment after {} rounds of standby optimization is {}\n 
with cost {}", round, clientStates, cost);

Review Comment:
   ```suggestion
           log.info("Assignment after {} rounds of standby task optimization is 
{}\n with cost {}", round, clientStates, cost);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to