ableegoldman commented on code in PR #16033:
URL: https://github.com/apache/kafka/pull/16033#discussion_r1618033785


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -74,20 +198,220 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
         final ApplicationState applicationState,
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
     ) {
+        if (!hasValidRackInformation(applicationState)) {
+            LOG.warn("Cannot optimize standby tasks with invalid rack 
information.");
+            return kafkaStreamsAssignments;
+        }
+
+        final int crossRackTrafficCost = 
applicationState.assignmentConfigs().rackAwareTrafficCost();
+        final int nonOverlapCost = 
applicationState.assignmentConfigs().rackAwareNonOverlapCost();
+        final long currentCost = computeTaskCost(
+            applicationState.allTasks(),
+            applicationState.kafkaStreamsStates(false),
+            crossRackTrafficCost,
+            nonOverlapCost,
+            true,
+            true
+        );
+        LOG.info("Assignment before standby task optimization has cost {}", 
currentCost);
         throw new UnsupportedOperationException("Not Implemented.");
     }
 
+    private static long computeTaskCost(final Set<TaskInfo> tasks,
+                                        final Map<ProcessId, 
KafkaStreamsState> clients,
+                                        final int crossRackTrafficCost,
+                                        final int nonOverlapCost,
+                                        final boolean hasReplica,
+                                        final boolean isStandby) {
+        if (tasks.isEmpty()) {
+            return 0;
+        }
+
+        final List<UUID> clientIds = 
clients.keySet().stream().map(ProcessId::id).collect(
+            Collectors.toList());
+
+        final List<TaskId> taskIds = 
tasks.stream().map(TaskInfo::id).collect(Collectors.toList());
+        final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId = 
tasks.stream().collect(
+            Collectors.toMap(TaskInfo::id, TaskInfo::topicPartitions));
+
+        final Map<UUID, Optional<String>> clientRacks = 
clients.values().stream().collect(
+            Collectors.toMap(state -> state.processId().id(), 
KafkaStreamsState::rackId));
+
+        final Map<UUID, Set<TaskId>> taskIdsByProcess = 
clients.values().stream().collect(
+            Collectors.toMap(state -> state.processId().id(), state -> {
+                if (isStandby) {
+                    return state.previousStandbyTasks();
+                }
+                return state.previousActiveTasks();
+            })
+        );
+
+        final RackAwareGraphConstructor<UUID> graphConstructor = new 
MinTrafficGraphConstructor<>();
+        final AssignmentGraph assignmentGraph = buildTaskGraph(clientIds, 
clientRacks, taskIds, taskIdsByProcess, topicPartitionsByTaskId,
+            crossRackTrafficCost, nonOverlapCost, hasReplica, isStandby, 
graphConstructor);
+        return assignmentGraph.graph.totalCost();
+    }
+
+    private static AssignmentGraph buildTaskGraph(final List<UUID> clientIds,
+                                                  final Map<UUID, 
Optional<String>> clientRacks,
+                                                  final List<TaskId> taskIds,
+                                                  final Map<UUID, Set<TaskId>> 
previousTaskIdsByProcess,
+                                                  final Map<TaskId, 
Set<TaskTopicPartition>> topicPartitionsByTaskId,
+                                                  final int 
crossRackTrafficCost,
+                                                  final int nonOverlapCost,
+                                                  final boolean hasReplica,
+                                                  final boolean isStandby,
+                                                  final 
RackAwareGraphConstructor<UUID> graphConstructor) {
+        final Map<UUID, UUID> clientsUuidByUuid = 
clientIds.stream().collect(Collectors.toMap(id -> id, id -> id));
+        final Map<TaskId, UUID> clientByTask = new HashMap<>();
+        final Map<UUID, Integer> taskCountByClient = new HashMap<>();
+        final Graph<Integer> graph = graphConstructor.constructTaskGraph(
+            clientIds,
+            taskIds,
+            clientsUuidByUuid,
+            clientByTask,
+            taskCountByClient,
+            (processId, taskId) -> {
+                return 
previousTaskIdsByProcess.get(processId).contains(taskId);
+            },
+            (taskId, processId, inCurrentAssignment, unused0, unused1, 
unused2) -> {
+                final int assignmentChangeCost = !inCurrentAssignment ? 
nonOverlapCost : 0;
+                final Optional<String> clientRack = clientRacks.get(processId);
+                final Set<TaskTopicPartition> topicPartitions = 
topicPartitionsByTaskId.get(taskId).stream().filter(tp -> {
+                    return isStandby ? tp.isChangelog() : true;
+                }).collect(Collectors.toSet());
+                return assignmentChangeCost + 
getCrossRackTrafficCost(topicPartitions, clientRack, crossRackTrafficCost);
+            },
+            crossRackTrafficCost,
+            nonOverlapCost,
+            hasReplica,
+            isStandby
+        );
+        return new AssignmentGraph(graph, clientByTask, taskCountByClient);
+    }
+
+    /**
+     * This internal structure is used to keep track of the graph solving 
outputs alongside the graph
+     * structure itself.
+     */
+    private static final class AssignmentGraph {
+        public final Graph<Integer> graph;
+        public final Map<TaskId, UUID> clientByTask;
+        public final Map<UUID, Integer> taskCountByClient;
+
+        public AssignmentGraph(final Graph<Integer> graph,
+                               final Map<TaskId, UUID> clientByTask,
+                               final Map<UUID, Integer> taskCountByClient) {
+            this.graph = graph;
+            this.clientByTask = clientByTask;
+            this.taskCountByClient = taskCountByClient;
+        }
+    }
+
     /**
-     * Return a "no-op" assignment that just copies the previous assignment of 
tasks to KafkaStreams clients
      *
-     * @param applicationState the metadata and other info describing the 
current application state
+     * @return the traffic cost of assigning this {@param task} to the client 
{@param streamsState}.
+     */
+    private static int getCrossRackTrafficCost(final Set<TaskTopicPartition> 
topicPartitions,
+                                               final Optional<String> 
clientRack,
+                                               final int crossRackTrafficCost) 
{
+        if (!clientRack.isPresent()) {
+            throw new IllegalStateException("Client doesn't have rack 
configured.");
+        }
+
+        int cost = 0;
+        for (final TaskTopicPartition topicPartition : topicPartitions) {
+            final Optional<Set<String>> topicPartitionRacks = 
topicPartition.rackIds();
+            if (topicPartitionRacks == null || 
!topicPartitionRacks.isPresent()) {
+                throw new IllegalStateException("TopicPartition " + 
topicPartition + " has no rack information.");
+            }
+
+            if (topicPartitionRacks.get().contains(clientRack.get())) {
+                continue;
+            }
+
+            cost += crossRackTrafficCost;
+        }
+        return cost;
+    }
+
+    /**
+     * This function returns whether the current application state has the 
required rack information
+     * to make assignment decisions with.
+     * This includes ensuring that every client has a known rack id, and that 
every topic partition for
+     * every logical task that needs to be assigned also has known rack ids.
+     * If a logical task has no source topic partitions, it will return false.
+     * If standby tasks are configured, but a logical task has no changelog 
topic partitions, it will return false.
      *
-     * @return a new map containing an assignment that replicates exactly the 
previous assignment reported
-     *         in the applicationState
+     * @return whether rack-aware assignment decisions can be made for this 
application.
      */
-    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(
-        final ApplicationState applicationState
-    ) {
-        throw new UnsupportedOperationException("Not Implemented.");
+    public static boolean hasValidRackInformation(final ApplicationState 
applicationState) {

Review Comment:
   needs to be private, otherwise it would be a public API which we don't want 
(and we don't need IIUC)



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -16,14 +16,63 @@
  */
 package org.apache.kafka.streams.processor.assignment;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.SortedSet;
+import java.util.UUID;
+import java.util.stream.Collectors;
 import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.internals.assignment.Graph;
+import 
org.apache.kafka.streams.processor.internals.assignment.MinTrafficGraphConstructor;
+import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructor;
+import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructorFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A set of utilities to help implement task assignment via the {@link 
TaskAssignor}
  */
 public final class TaskAssignmentUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TaskAssignmentUtils.class);
+
+    private TaskAssignmentUtils() {}
+
+    /**
+     * Return a "no-op" assignment that just copies the previous assignment of 
tasks to KafkaStreams clients
+     *
+     * @param applicationState the metadata and other info describing the 
current application state
+     *
+     * @return a new map containing an assignment that replicates exactly the 
previous assignment reported
+     *         in the applicationState
+     */
+    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(
+        final ApplicationState applicationState
+    ) {

Review Comment:
   Kafka Streams formatting: first parameter always on the same line (no matter 
how ridiculously long it gets)
   
   ```suggestion
       public static Map<ProcessId, KafkaStreamsAssignment> 
identityAssignment(final ApplicationState applicationState) {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -58,7 +112,85 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareActiveTask
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
         final SortedSet<TaskId> tasks

Review Comment:
   I think we're still missing the javadocs? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to