mjsax commented on code in PR #14030: URL: https://github.com/apache/kafka/pull/14030#discussion_r1270106140
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ########## @@ -185,4 +191,224 @@ public boolean validateClientRack() { } return true; } + + private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { + final Map<String, Optional<String>> clientRacks = racksForProcess.get(clientId); + if (clientRacks == null) { + throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); + } + final Optional<Optional<String>> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); + if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { + throw new IllegalStateException("Client " + clientId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); + } + + final String clientRack = clientRackOpt.get().get(); + final Set<TopicPartition> topicPartitions = partitionsForTask.get(taskId); + if (topicPartitions == null) { + throw new IllegalStateException("Task " + taskId + " has no TopicPartitions"); + } + + final int trafficCost = assignmentConfigs.trafficCost == null ? (isStateful ? DEFAULT_STATEFUL_TRAFFIC_COST : DEFAULT_STATELESS_TRAFFIC_COST) + : assignmentConfigs.trafficCost; + final int nonOverlapCost = assignmentConfigs.nonOverlapCost == null ? (isStateful ? DEFAULT_STATEFUL_NON_OVERLAP_COST : DEFAULT_STATELESS_NON_OVERLAP_COST) + : assignmentConfigs.nonOverlapCost; + + int cost = 0; + for (final TopicPartition tp : topicPartitions) { + final Set<String> tpRacks = racksForPartition.get(tp); + if (tpRacks == null || tpRacks.isEmpty()) { + throw new IllegalStateException("TopicPartition " + tp + " has no rack information. Maybe forgot to call canEnableRackAwareAssignor first"); + } + if (!tpRacks.contains(clientRack)) { + cost += trafficCost; + } + } + + if (!inCurrentAssignment) { + cost += nonOverlapCost; + } + + return cost; + } + + // For testing. canEnableRackAwareAssignor must be called first + long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, final SortedSet<TaskId> statefulTasks, final boolean isStateful) { + final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); + final List<TaskId> taskIdList = new ArrayList<>(statefulTasks); + final Map<TaskId, UUID> taskClientMap = new HashMap<>(); + final Map<UUID, Integer> clientCapacity = new HashMap<>(); + final Graph<Integer> graph = new Graph<>(); + + constructStatefulActiveTaskGraph(graph, statefulTasks, clientList, taskIdList, + clientStates, taskClientMap, clientCapacity, isStateful); + + final int sourceId = taskIdList.size() + clientList.size(); + final int sinkId = sourceId + 1; Review Comment: Looking into `constructStatefulActiveTaskGraph` it does the same thing. Sounds error prone. Should we not get source and sink ID from `graph` instead of re-computing it? (Seems we do the same thing on two places) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org