mjsax commented on code in PR #14030:
URL: https://github.com/apache/kafka/pull/14030#discussion_r1275434335


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##########
@@ -69,12 +71,38 @@ public final class AssignmentTestUtils {
     public static final UUID UUID_8 = uuidForInt(8);
     public static final UUID UUID_9 = uuidForInt(9);
 
-    public static final TopicPartition TP_0_0 = new TopicPartition("topic0", 
0);
-    public static final TopicPartition TP_0_1 = new TopicPartition("topic0", 
1);
-    public static final TopicPartition TP_0_2 = new TopicPartition("topic0", 
2);
-    public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 
0);
-    public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 
1);
-    public static final TopicPartition TP_1_2 = new TopicPartition("topic1", 
2);
+    public static final String RACK_0 = "rock0";

Review Comment:
   ```suggestion
       public static final String RACK_0 = "rack0";
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -185,4 +188,213 @@ public boolean validateClientRack() {
         }
         return true;
     }
+
+    private int getCost(final TaskId taskId, final UUID processId, final 
boolean inCurrentAssignment, final int trafficCost, final int nonOverlapCost) {
+        final Map<String, Optional<String>> clientRacks = 
racksForProcess.get(processId);
+        if (clientRacks == null) {
+            throw new IllegalStateException("Client " + processId + " doesn't 
exist in processRacks");
+        }
+        final Optional<Optional<String>> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+        if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+            throw new IllegalStateException("Client " + processId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+        }
+
+        final String clientRack = clientRackOpt.get().get();
+        final Set<TopicPartition> topicPartitions = 
partitionsForTask.get(taskId);
+        if (topicPartitions == null || topicPartitions.isEmpty()) {
+            throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+        }
+
+        int cost = 0;
+        for (final TopicPartition tp : topicPartitions) {
+            final Set<String> tpRacks = racksForPartition.get(tp);
+            if (tpRacks == null || tpRacks.isEmpty()) {
+                throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+            }
+            if (!tpRacks.contains(clientRack)) {
+                cost += trafficCost;
+            }
+        }
+
+        if (!inCurrentAssignment) {
+            cost += nonOverlapCost;
+        }
+
+        return cost;
+    }
+
+    private static int getSinkID(final List<UUID> clientList, final 
List<TaskId> taskIdList) {
+        return clientList.size() + taskIdList.size();
+    }
+
+    // For testing. canEnableRackAwareAssignor must be called first
+    long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, 
final SortedSet<TaskId> activeTasks, final int trafficCost, final int 
nonOverlapCost) {

Review Comment:
   Can we add JavaDocs? It's a little unclear what this method does. Also maybe 
move `activeTasks` as first parameter, as they are the main input (all others 
are metadata)?
   ```
   /*
   Compute the cost for the provided {@code activeTasks}. The passed in active 
tasks must be contained in {@code clientState}`.
   */
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -74,11 +81,7 @@ public synchronized boolean 
canEnableRackAwareAssignorForActiveTasks() {
         }
 
         return validateTopicPartitionRack();
-    }
-
-    public boolean canEnableRackAwareAssignorForStandbyTasks() {
-        // TODO
-        return false;
+        // TODO: add changelog topic, standby task validation
     }
 
     // Visible for testing. This method also checks if all TopicPartitions 
exist in cluster

Review Comment:
   Typo one line below: `D[e]scribe`



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##########
@@ -69,12 +71,38 @@ public final class AssignmentTestUtils {
     public static final UUID UUID_8 = uuidForInt(8);
     public static final UUID UUID_9 = uuidForInt(9);
 
-    public static final TopicPartition TP_0_0 = new TopicPartition("topic0", 
0);
-    public static final TopicPartition TP_0_1 = new TopicPartition("topic0", 
1);
-    public static final TopicPartition TP_0_2 = new TopicPartition("topic0", 
2);
-    public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 
0);
-    public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 
1);
-    public static final TopicPartition TP_1_2 = new TopicPartition("topic1", 
2);
+    public static final String RACK_0 = "rock0";

Review Comment:
   Same below.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##########
@@ -185,4 +191,224 @@ public boolean validateClientRack() {
         }
         return true;
     }
+
+    private int getCost(final TaskId taskId, final UUID clientId, final 
boolean inCurrentAssignment, final boolean isStateful) {
+        final Map<String, Optional<String>> clientRacks = 
racksForProcess.get(clientId);
+        if (clientRacks == null) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
exist in processRacks");
+        }
+        final Optional<Optional<String>> clientRackOpt = 
clientRacks.values().stream().filter(Optional::isPresent).findFirst();
+        if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) {
+            throw new IllegalStateException("Client " + clientId + " doesn't 
have rack configured. Maybe forgot to call canEnableRackAwareAssignor first");
+        }
+
+        final String clientRack = clientRackOpt.get().get();
+        final Set<TopicPartition> topicPartitions = 
partitionsForTask.get(taskId);
+        if (topicPartitions == null) {
+            throw new IllegalStateException("Task " + taskId + " has no 
TopicPartitions");
+        }
+
+        final int trafficCost = assignmentConfigs.trafficCost == null ? 
(isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST)
+            : assignmentConfigs.trafficCost;
+        final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? 
(isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : 
DEFAULT_STATELESS_NON_OVERLAP_COST)
+            : assignmentConfigs.nonOverlapCost;
+
+        int cost = 0;
+        for (final TopicPartition tp : topicPartitions) {
+            final Set<String> tpRacks = racksForPartition.get(tp);
+            if (tpRacks == null || tpRacks.isEmpty()) {
+                throw new IllegalStateException("TopicPartition " + tp + " has 
no rack information. Maybe forgot to call canEnableRackAwareAssignor first");
+            }
+            if (!tpRacks.contains(clientRack)) {
+                cost += trafficCost;
+            }
+        }
+
+        if (!inCurrentAssignment) {
+            cost += nonOverlapCost;

Review Comment:
   Could work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to