mjsax commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1275434335
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java: ########## @@ -69,12 +71,38 @@ public final class AssignmentTestUtils { public static final UUID UUID_8 = uuidForInt(8); public static final UUID UUID_9 = uuidForInt(9); - public static final TopicPartition TP_0_0 = new TopicPartition("topic0", 0); - public static final TopicPartition TP_0_1 = new TopicPartition("topic0", 1); - public static final TopicPartition TP_0_2 = new TopicPartition("topic0", 2); - public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 0); - public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 1); - public static final TopicPartition TP_1_2 = new TopicPartition("topic1", 2); + public static final String RACK_0 = "rock0"; Review Comment: ```suggestion public static final String RACK_0 = "rack0"; ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -185,4 +188,213 @@ public boolean validateClientRack() { } return true; } + + private int getCost(final TaskId taskId, final UUID processId, final boolean inCurrentAssignment, final int trafficCost, final int nonOverlapCost) { + final Map<String, Optional<String>> clientRacks = racksForProcess.get(processId); + if (clientRacks == null) { + throw new IllegalStateException("Client " + processId + " doesn't exist in processRacks"); + } + final Optional<Optional<String>> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); + if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { + throw new IllegalStateException("Client " + processId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); + } + + final String clientRack = clientRackOpt.get().get(); + final Set<TopicPartition> topicPartitions = partitionsForTask.get(taskId); + if (topicPartitions == null || topicPartitions.isEmpty()) { + throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); + } + + int cost = 0; + for (final TopicPartition tp : topicPartitions) { + final Set<String> tpRacks = racksForPartition.get(tp); + if (tpRacks == null || tpRacks.isEmpty()) { + throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); + } + if (!tpRacks.contains(clientRack)) { + cost += trafficCost; + } + } + + if (!inCurrentAssignment) { + cost += nonOverlapCost; + } + + return cost; + } + + private static int getSinkID(final List<UUID> clientList, final List<TaskId> taskIdList) { + return clientList.size() + taskIdList.size(); + } + + // For testing. canEnableRackAwareAssignor must be called first + long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, final SortedSet<TaskId> activeTasks, final int trafficCost, final int nonOverlapCost) { Review Comment: Can we add JavaDocs? It's a little unclear what this method does. Also maybe move `activeTasks` as first parameter, as they are the main input (all others are metadata)? ``` /* Compute the cost for the provided {@code activeTasks}. The passed in active tasks must be contained in {@code clientState}`. */ ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -74,11 +81,7 @@ public synchronized boolean canEnableRackAwareAssignorForActiveTasks() { } return validateTopicPartitionRack(); - } - - public boolean canEnableRackAwareAssignorForStandbyTasks() { - // TODO - return false; + // TODO: add changelog topic, standby task validation } // Visible for testing. This method also checks if all TopicPartitions exist in cluster Review Comment: Typo one line below: `D[e]scribe` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java: ########## @@ -69,12 +71,38 @@ public final class AssignmentTestUtils { public static final UUID UUID_8 = uuidForInt(8); public static final UUID UUID_9 = uuidForInt(9); - public static final TopicPartition TP_0_0 = new TopicPartition("topic0", 0); - public static final TopicPartition TP_0_1 = new TopicPartition("topic0", 1); - public static final TopicPartition TP_0_2 = new TopicPartition("topic0", 2); - public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 0); - public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 1); - public static final TopicPartition TP_1_2 = new TopicPartition("topic1", 2); + public static final String RACK_0 = "rock0"; Review Comment: Same below. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + + private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { + final Map<String, Optional<String>> clientRacks = racksForProcess.get(clientId); + if (clientRacks == null) { + throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); + } + final Optional<Optional<String>> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); + if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { + throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); + } + + final String clientRack = clientRackOpt.get().get(); + final Set<TopicPartition> topicPartitions = partitionsForTask.get(taskId); + if (topicPartitions == null) { + throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); + } + + final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) + : assignmentConfigs.trafficCost; + final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) + : assignmentConfigs.nonOverlapCost; + + int cost = 0; + for (final TopicPartition tp : topicPartitions) { + final Set<String> tpRacks = racksForPartition.get(tp); + if (tpRacks == null || tpRacks.isEmpty()) { + throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); + } + if (!tpRacks.contains(clientRack)) { + cost += trafficCost; + } + } + + if (!inCurrentAssignment) { + cost += nonOverlapCost; Review Comment: Could work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org