ableegoldman commented on code in PR #16033:
URL: https://github.com/apache/kafka/pull/16033#discussion_r1613111098


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -74,20 +209,240 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
         final ApplicationState applicationState,
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
     ) {
+        if (!hasValidRackInformation(applicationState)) {
+            throw new IllegalStateException("Cannot perform rack-aware 
assignment optimizations with invalid rack information.");
+        }
+
+        final int crossRackTrafficCost= 
applicationState.assignmentConfigs().rackAwareTrafficCost();
+        final int nonOverlapCost = 
applicationState.assignmentConfigs().rackAwareNonOverlapCost();
+        final long currentCost = computeTaskCost(
+            applicationState.allTasks(),
+            applicationState.kafkaStreamsStates(false),
+            crossRackTrafficCost,
+            nonOverlapCost,
+            true,
+            true
+        );
+        LOG.info("Assignment before standby task optimization has cost {}", 
currentCost);
         throw new UnsupportedOperationException("Not Implemented.");
     }
 
+    private static long computeTaskCost(final Set<TaskInfo> tasks,
+                                        final Map<ProcessId, 
KafkaStreamsState> clients,
+                                        final int crossRackTrafficCost,
+                                        final int nonOverlapCost,
+                                        final boolean hasReplica,
+                                        final boolean isStandby) {
+        if (tasks.isEmpty()) {
+            return 0;
+        }
+
+        final List<UUID> clientIds = 
clients.keySet().stream().map(ProcessId::id).collect(
+            Collectors.toList());
+
+        final List<TaskId> taskIds = 
tasks.stream().map(TaskInfo::id).collect(Collectors.toList());
+        final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId = 
tasks.stream().collect(
+            Collectors.toMap(TaskInfo::id, TaskInfo::topicPartitions));
+
+        final Map<UUID, Optional<String>> clientRacks = 
clients.values().stream().collect(
+            Collectors.toMap(state -> state.processId().id(), 
KafkaStreamsState::rackId));
+
+        final Map<UUID, Set<TaskId>> taskIdsByProcess = 
clients.values().stream().collect(
+            Collectors.toMap(state -> state.processId().id(), state -> {
+                if (isStandby) {
+                    return state.previousStandbyTasks();
+                }
+                return state.previousActiveTasks();
+            })
+        );
+
+        final RackAwareGraphConstructor<UUID> graphConstructor = new 
MinTrafficGraphConstructor<>();
+        final AssignmentGraph assignmentGraph = buildTaskGraph(clientIds, 
clientRacks, taskIds, taskIdsByProcess, topicPartitionsByTaskId,
+            crossRackTrafficCost, nonOverlapCost, hasReplica, isStandby, 
graphConstructor);
+        return assignmentGraph.graph.totalCost();
+    }
+
+    private static AssignmentGraph buildTaskGraph(final List<UUID> clientIds,
+                                                 final Map<UUID, 
Optional<String>> clientRacks,
+                                                 final List<TaskId> taskIds,
+                                                 final Map<UUID, Set<TaskId>> 
taskIdsByProcess,

Review Comment:
   We're passing in the wrong thing here. Ultimately this is used for the 
`hasAssignedTask` predicate by the graph constructors, which means "is this 
task assigned to this client in the current iteration of the assignment". 
However both callers of this method construct the `tasksIdsByProcess` map based 
on the _previous_ task assignment, not the _current_ one. 
   
   We need to be careful in differentiating between previous assignment and 
current assignment. How are we actually keeping track of and/or updating the 
current assignment through these rack-aware optimizations? And would it help if 
KafkaStreamsAssignment had some methods for mutating the set of assigned tasks 
so that we can use the KafkaStreamsAssignment class directly, instead of always 
translating between maps of KafkaStreamsAssignment and maps of 
Set<AssignedTask>? I was thinking about this earlier since it also comes up a 
lot in the StickyTaskAssignor



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -39,6 +91,8 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
defaultStandbyTaskAssignmen
         final ApplicationState applicationState,
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
     ) {
+        final long followupRebalanceDelay = 
applicationState.assignmentConfigs().probingRebalanceIntervalMs();
+        final Instant followupRebalanceDeadline = 
Instant.now().plus(followupRebalanceDelay, ChronoUnit.MILLIS);

Review Comment:
   Was this intentional? FWIW none of the `TaskAssignmentUtils` methods should 
ever schedule followup rebalances 



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -58,7 +112,88 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareActiveTask
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
         final SortedSet<TaskId> tasks
     ) {
-        throw new UnsupportedOperationException("Not Implemented.");
+        if (tasks.isEmpty()) {
+            return kafkaStreamsAssignments;
+        }
+
+        if (!hasValidRackInformation(applicationState)) {

Review Comment:
   It seems a little silly to have to run this exact check twice, which we are 
essentially doing by throwing an exception if it fails. For example the 
StickyTaskAssignor runs this check and if false, skips calling the rack-aware 
optimization. 
   
   Imo, to best keep the current behavior where it simply skips applying this 
optimization if it doesn't have all of the rack id info it needs, we should 
just return early here instead of throwing an IllegalStateException, and not 
call it at all in the StickyTaskAssignor (or any of the assignors). The point 
is to move any rack-related logic out of the assignors and encapsulate it in 
these utility methods, so they can simply be called without knowing anything 
about how rack aware assignment works. Throwing an exception if you call this 
method without first having run some pre-check that users won't easily know 
about seems a bit cruel



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -74,20 +209,240 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
         final ApplicationState applicationState,
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
     ) {
+        if (!hasValidRackInformation(applicationState)) {
+            throw new IllegalStateException("Cannot perform rack-aware 
assignment optimizations with invalid rack information.");
+        }
+
+        final int crossRackTrafficCost= 
applicationState.assignmentConfigs().rackAwareTrafficCost();
+        final int nonOverlapCost = 
applicationState.assignmentConfigs().rackAwareNonOverlapCost();
+        final long currentCost = computeTaskCost(
+            applicationState.allTasks(),
+            applicationState.kafkaStreamsStates(false),
+            crossRackTrafficCost,
+            nonOverlapCost,
+            true,
+            true
+        );
+        LOG.info("Assignment before standby task optimization has cost {}", 
currentCost);
         throw new UnsupportedOperationException("Not Implemented.");
     }
 
+    private static long computeTaskCost(final Set<TaskInfo> tasks,
+                                        final Map<ProcessId, 
KafkaStreamsState> clients,
+                                        final int crossRackTrafficCost,
+                                        final int nonOverlapCost,
+                                        final boolean hasReplica,
+                                        final boolean isStandby) {
+        if (tasks.isEmpty()) {
+            return 0;
+        }
+
+        final List<UUID> clientIds = 
clients.keySet().stream().map(ProcessId::id).collect(
+            Collectors.toList());
+
+        final List<TaskId> taskIds = 
tasks.stream().map(TaskInfo::id).collect(Collectors.toList());
+        final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId = 
tasks.stream().collect(
+            Collectors.toMap(TaskInfo::id, TaskInfo::topicPartitions));
+
+        final Map<UUID, Optional<String>> clientRacks = 
clients.values().stream().collect(
+            Collectors.toMap(state -> state.processId().id(), 
KafkaStreamsState::rackId));
+
+        final Map<UUID, Set<TaskId>> taskIdsByProcess = 
clients.values().stream().collect(
+            Collectors.toMap(state -> state.processId().id(), state -> {
+                if (isStandby) {
+                    return state.previousStandbyTasks();
+                }
+                return state.previousActiveTasks();
+            })
+        );
+
+        final RackAwareGraphConstructor<UUID> graphConstructor = new 
MinTrafficGraphConstructor<>();
+        final AssignmentGraph assignmentGraph = buildTaskGraph(clientIds, 
clientRacks, taskIds, taskIdsByProcess, topicPartitionsByTaskId,
+            crossRackTrafficCost, nonOverlapCost, hasReplica, isStandby, 
graphConstructor);
+        return assignmentGraph.graph.totalCost();
+    }
+
+    private static AssignmentGraph buildTaskGraph(final List<UUID> clientIds,
+                                                 final Map<UUID, 
Optional<String>> clientRacks,
+                                                 final List<TaskId> taskIds,
+                                                 final Map<UUID, Set<TaskId>> 
taskIdsByProcess,
+                                                 final Map<TaskId, 
Set<TaskTopicPartition>> topicPartitionsByTaskId,
+                                                 final int 
crossRackTrafficCost,
+                                                 final int nonOverlapCost,
+                                                 final boolean hasReplica,
+                                                 final boolean isStandby,
+                                                 final 
RackAwareGraphConstructor<UUID> graphConstructor) {
+        final Map<UUID, UUID> clientsUuidByUuid = 
clientIds.stream().collect(Collectors.toMap(id -> id, id -> id));
+        final Map<TaskId, UUID> clientByTask = new HashMap<>();
+        final Map<UUID, Integer> taskCountByClient = new HashMap<>();
+        final Graph<Integer> graph = graphConstructor.constructTaskGraph(
+            clientIds,
+            taskIds,
+            clientsUuidByUuid,
+            clientByTask,
+            taskCountByClient,
+            (processId, taskId) -> {
+                return taskIdsByProcess.get(processId).contains(taskId);
+            },
+            (taskId, processId, inCurrentAssignment, unused0, unused1, 
unused2) -> {
+                final int assignmentChangeCost = !inCurrentAssignment ? 
nonOverlapCost : 0;
+                final Optional<String> clientRack = clientRacks.get(processId);
+                final Set<TaskTopicPartition> topicPartitions = 
topicPartitionsByTaskId.get(taskId).stream().filter(tp -> {
+                    return isStandby ? tp.isChangelog() : true;
+                }).collect(Collectors.toSet());
+                return (assignmentChangeCost + 
getCrossRackTrafficCost(topicPartitions, clientRack, crossRackTrafficCost));
+            },
+            crossRackTrafficCost,
+            nonOverlapCost,
+            hasReplica,
+            isStandby
+        );
+        return new AssignmentGraph(graph, clientByTask, taskCountByClient);
+    }
+
+    /**
+     * This internal structure is used to keep track of the graph solving 
outputs alongside the graph
+     * structure itself.
+     */
+    private static final class AssignmentGraph {
+        public final Graph<Integer> graph;
+        public final Map<TaskId, UUID> clientByTask;
+        public final Map<UUID, Integer> taskCountByClient;
+
+        public AssignmentGraph(final Graph<Integer> graph,
+                               final Map<TaskId, UUID> clientByTask,
+                               final Map<UUID, Integer> taskCountByClient) {
+            this.graph = graph;
+            this.clientByTask = clientByTask;
+            this.taskCountByClient = taskCountByClient;
+        }
+    }
+
     /**
-     * Return a "no-op" assignment that just copies the previous assignment of 
tasks to KafkaStreams clients
      *
-     * @param applicationState the metadata and other info describing the 
current application state
+     * @return the traffic cost of assigning this {@param task} to the client 
{@param streamsState}.
+     */
+    private static int getCrossRackTrafficCost(final Set<TaskTopicPartition> 
topicPartitions,
+                                               final Optional<String> 
clientRack,
+                                               final int crossRackTrafficCost) 
{
+        if (!clientRack.isPresent()) {
+            throw new IllegalStateException("Client doesn't have rack 
configured.");
+        }
+
+        int cost = 0;
+        for (final TaskTopicPartition topicPartition : topicPartitions) {
+            final Optional<Set<String>> topicPartitionRacks = 
topicPartition.rackIds();
+            if (topicPartitionRacks == null || 
!topicPartitionRacks.isPresent()) {
+                throw new IllegalStateException("TopicPartition " + 
topicPartition + " has no rack information.");
+            }
+
+            if (topicPartitionRacks.get().contains(clientRack.get())) {
+                continue;
+            }
+
+            cost += crossRackTrafficCost;
+        }
+        return cost;
+    }
+
+    /**
+     * This function returns whether the current application state has the 
required rack information
+     * to make assignment decisions with.
+     * This includes ensuring that every client has a known rack id, and that 
every topic partition for
+     * every logical task that needs to be assigned also has known rack ids.
+     * If a logical task has no source topic partitions, it will return false.
+     * If standby tasks are configured, but a logical task has no changelog 
topic partitions, it will return false.
      *
-     * @return a new map containing an assignment that replicates exactly the 
previous assignment reported
-     *         in the applicationState
+     * @return whether rack-aware assignment decisions can be made for this 
application.
      */
-    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(
-        final ApplicationState applicationState
-    ) {
-        throw new UnsupportedOperationException("Not Implemented.");
+    public static boolean hasValidRackInformation(final ApplicationState 
applicationState) {
+        for (final KafkaStreamsState state : 
applicationState.kafkaStreamsStates(false).values()) {
+            if (!hasValidRackInformation(state)) {
+                return false;
+            }
+        }
+
+        final boolean hasStandby = 
applicationState.assignmentConfigs().numStandbyReplicas() >= 1;
+        for (final TaskInfo task : applicationState.allTasks()) {
+            if (!hasValidRackInformation(task, hasStandby)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static boolean hasValidRackInformation(final KafkaStreamsState 
state) {
+        if (!state.rackId().isPresent()) {
+            LOG.error("Client " + state.processId() + " doesn't have rack 
configured.");
+            return false;
+        }
+        return true;
+    }
+
+    private static boolean hasValidRackInformation(final TaskInfo task, final 
boolean hasStandby) {
+        final Map<TopicPartition, Set<String>> racksByTopicPartition = 
task.partitionToRackIds();
+        for (final TopicPartition topicPartition : 
task.sourceTopicPartitions()) {
+            final Set<String> racks = 
racksByTopicPartition.getOrDefault(topicPartition, null);
+            if (racks == null || racks.isEmpty()) {
+                LOG.error("Topic partition {} for task {} does not have racks 
configured.", topicPartition, task.id());
+                return false;
+            }
+        }
+
+        for (final TopicPartition topicPartition : 
task.changelogTopicPartitions()) {
+            final Set<String> racks = 
racksByTopicPartition.getOrDefault(topicPartition, null);
+            if (racks == null || racks.isEmpty()) {
+                LOG.error("Topic partition {} for task {} does not have racks 
configured.", topicPartition, task.id());
+                return false;
+            }
+        }
+
+        if (task.sourceTopicPartitions().isEmpty()) {

Review Comment:
   It really doesn't make sense to include this check here, whether a task has 
source partitions has nothing to do with whether the rack info is valid. Also, 
it shouldn't be up to the user/assignor to verify the inputs to the assignor 
and check conditions such as "every task has at least one source topic 
partition". 



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -74,20 +209,240 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
         final ApplicationState applicationState,
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
     ) {
+        if (!hasValidRackInformation(applicationState)) {
+            throw new IllegalStateException("Cannot perform rack-aware 
assignment optimizations with invalid rack information.");
+        }
+
+        final int crossRackTrafficCost= 
applicationState.assignmentConfigs().rackAwareTrafficCost();
+        final int nonOverlapCost = 
applicationState.assignmentConfigs().rackAwareNonOverlapCost();
+        final long currentCost = computeTaskCost(
+            applicationState.allTasks(),
+            applicationState.kafkaStreamsStates(false),
+            crossRackTrafficCost,
+            nonOverlapCost,
+            true,
+            true
+        );
+        LOG.info("Assignment before standby task optimization has cost {}", 
currentCost);
         throw new UnsupportedOperationException("Not Implemented.");
     }
 
+    private static long computeTaskCost(final Set<TaskInfo> tasks,
+                                        final Map<ProcessId, 
KafkaStreamsState> clients,
+                                        final int crossRackTrafficCost,
+                                        final int nonOverlapCost,
+                                        final boolean hasReplica,
+                                        final boolean isStandby) {
+        if (tasks.isEmpty()) {
+            return 0;
+        }
+
+        final List<UUID> clientIds = 
clients.keySet().stream().map(ProcessId::id).collect(
+            Collectors.toList());
+
+        final List<TaskId> taskIds = 
tasks.stream().map(TaskInfo::id).collect(Collectors.toList());
+        final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId = 
tasks.stream().collect(
+            Collectors.toMap(TaskInfo::id, TaskInfo::topicPartitions));
+
+        final Map<UUID, Optional<String>> clientRacks = 
clients.values().stream().collect(
+            Collectors.toMap(state -> state.processId().id(), 
KafkaStreamsState::rackId));
+
+        final Map<UUID, Set<TaskId>> taskIdsByProcess = 
clients.values().stream().collect(
+            Collectors.toMap(state -> state.processId().id(), state -> {
+                if (isStandby) {
+                    return state.previousStandbyTasks();
+                }
+                return state.previousActiveTasks();
+            })
+        );
+
+        final RackAwareGraphConstructor<UUID> graphConstructor = new 
MinTrafficGraphConstructor<>();
+        final AssignmentGraph assignmentGraph = buildTaskGraph(clientIds, 
clientRacks, taskIds, taskIdsByProcess, topicPartitionsByTaskId,
+            crossRackTrafficCost, nonOverlapCost, hasReplica, isStandby, 
graphConstructor);
+        return assignmentGraph.graph.totalCost();
+    }
+
+    private static AssignmentGraph buildTaskGraph(final List<UUID> clientIds,
+                                                 final Map<UUID, 
Optional<String>> clientRacks,
+                                                 final List<TaskId> taskIds,
+                                                 final Map<UUID, Set<TaskId>> 
taskIdsByProcess,
+                                                 final Map<TaskId, 
Set<TaskTopicPartition>> topicPartitionsByTaskId,
+                                                 final int 
crossRackTrafficCost,
+                                                 final int nonOverlapCost,
+                                                 final boolean hasReplica,
+                                                 final boolean isStandby,
+                                                 final 
RackAwareGraphConstructor<UUID> graphConstructor) {
+        final Map<UUID, UUID> clientsUuidByUuid = 
clientIds.stream().collect(Collectors.toMap(id -> id, id -> id));
+        final Map<TaskId, UUID> clientByTask = new HashMap<>();
+        final Map<UUID, Integer> taskCountByClient = new HashMap<>();
+        final Graph<Integer> graph = graphConstructor.constructTaskGraph(
+            clientIds,
+            taskIds,
+            clientsUuidByUuid,
+            clientByTask,
+            taskCountByClient,
+            (processId, taskId) -> {
+                return taskIdsByProcess.get(processId).contains(taskId);
+            },
+            (taskId, processId, inCurrentAssignment, unused0, unused1, 
unused2) -> {
+                final int assignmentChangeCost = !inCurrentAssignment ? 
nonOverlapCost : 0;
+                final Optional<String> clientRack = clientRacks.get(processId);
+                final Set<TaskTopicPartition> topicPartitions = 
topicPartitionsByTaskId.get(taskId).stream().filter(tp -> {
+                    return isStandby ? tp.isChangelog() : true;
+                }).collect(Collectors.toSet());
+                return (assignmentChangeCost + 
getCrossRackTrafficCost(topicPartitions, clientRack, crossRackTrafficCost));
+            },
+            crossRackTrafficCost,
+            nonOverlapCost,
+            hasReplica,
+            isStandby
+        );
+        return new AssignmentGraph(graph, clientByTask, taskCountByClient);
+    }
+
+    /**
+     * This internal structure is used to keep track of the graph solving 
outputs alongside the graph
+     * structure itself.
+     */
+    private static final class AssignmentGraph {
+        public final Graph<Integer> graph;
+        public final Map<TaskId, UUID> clientByTask;
+        public final Map<UUID, Integer> taskCountByClient;
+
+        public AssignmentGraph(final Graph<Integer> graph,
+                               final Map<TaskId, UUID> clientByTask,
+                               final Map<UUID, Integer> taskCountByClient) {
+            this.graph = graph;
+            this.clientByTask = clientByTask;
+            this.taskCountByClient = taskCountByClient;
+        }
+    }
+
     /**
-     * Return a "no-op" assignment that just copies the previous assignment of 
tasks to KafkaStreams clients
      *
-     * @param applicationState the metadata and other info describing the 
current application state
+     * @return the traffic cost of assigning this {@param task} to the client 
{@param streamsState}.
+     */
+    private static int getCrossRackTrafficCost(final Set<TaskTopicPartition> 
topicPartitions,
+                                               final Optional<String> 
clientRack,
+                                               final int crossRackTrafficCost) 
{
+        if (!clientRack.isPresent()) {
+            throw new IllegalStateException("Client doesn't have rack 
configured.");
+        }
+
+        int cost = 0;
+        for (final TaskTopicPartition topicPartition : topicPartitions) {
+            final Optional<Set<String>> topicPartitionRacks = 
topicPartition.rackIds();
+            if (topicPartitionRacks == null || 
!topicPartitionRacks.isPresent()) {
+                throw new IllegalStateException("TopicPartition " + 
topicPartition + " has no rack information.");
+            }
+
+            if (topicPartitionRacks.get().contains(clientRack.get())) {
+                continue;
+            }
+
+            cost += crossRackTrafficCost;
+        }
+        return cost;
+    }
+
+    /**
+     * This function returns whether the current application state has the 
required rack information
+     * to make assignment decisions with.
+     * This includes ensuring that every client has a known rack id, and that 
every topic partition for
+     * every logical task that needs to be assigned also has known rack ids.
+     * If a logical task has no source topic partitions, it will return false.
+     * If standby tasks are configured, but a logical task has no changelog 
topic partitions, it will return false.
      *
-     * @return a new map containing an assignment that replicates exactly the 
previous assignment reported
-     *         in the applicationState
+     * @return whether rack-aware assignment decisions can be made for this 
application.
      */
-    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(
-        final ApplicationState applicationState
-    ) {
-        throw new UnsupportedOperationException("Not Implemented.");
+    public static boolean hasValidRackInformation(final ApplicationState 
applicationState) {
+        for (final KafkaStreamsState state : 
applicationState.kafkaStreamsStates(false).values()) {
+            if (!hasValidRackInformation(state)) {
+                return false;
+            }
+        }
+
+        final boolean hasStandby = 
applicationState.assignmentConfigs().numStandbyReplicas() >= 1;
+        for (final TaskInfo task : applicationState.allTasks()) {
+            if (!hasValidRackInformation(task, hasStandby)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static boolean hasValidRackInformation(final KafkaStreamsState 
state) {
+        if (!state.rackId().isPresent()) {
+            LOG.error("Client " + state.processId() + " doesn't have rack 
configured.");
+            return false;
+        }
+        return true;
+    }
+
+    private static boolean hasValidRackInformation(final TaskInfo task, final 
boolean hasStandby) {
+        final Map<TopicPartition, Set<String>> racksByTopicPartition = 
task.partitionToRackIds();
+        for (final TopicPartition topicPartition : 
task.sourceTopicPartitions()) {
+            final Set<String> racks = 
racksByTopicPartition.getOrDefault(topicPartition, null);
+            if (racks == null || racks.isEmpty()) {
+                LOG.error("Topic partition {} for task {} does not have racks 
configured.", topicPartition, task.id());
+                return false;
+            }
+        }
+
+        for (final TopicPartition topicPartition : 
task.changelogTopicPartitions()) {
+            final Set<String> racks = 
racksByTopicPartition.getOrDefault(topicPartition, null);
+            if (racks == null || racks.isEmpty()) {
+                LOG.error("Topic partition {} for task {} does not have racks 
configured.", topicPartition, task.id());
+                return false;
+            }
+        }
+
+        if (task.sourceTopicPartitions().isEmpty()) {
+            LOG.error("Task {} has no source TopicPartitions.", task.id());
+            return false;
+        }
+
+        if (hasStandby && task.changelogTopicPartitions().isEmpty()) {
+            LOG.error("Task {} has no changelog TopicPartitions.", task.id());
+            return false;

Review Comment:
   This should not be an error, we can remove the `hasStandby` parameter 
entirely.
   
   For one thing, it shouldn't be an error to have configured an app to use 
standby tasks even if there's no state/changelog topics (Streams just wouldn't 
create those standby tasks). But for another, it's also possible to have a 
standby task with no changelog topics. Not all stateful tasks necessarily have 
to have changelogs (though most do), one can configure a state store with 
"logging disabled" which just means it has no changelog topic. Sometimes people 
play fast and lose with semantics



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -58,7 +112,85 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareActiveTask
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
         final SortedSet<TaskId> tasks

Review Comment:
   It might be worth including some of the javadocs from 
RackAwareTaskAssignor#optimizeActiveTasks, especially the ones that describe 
what the two relevant configs do/mean. I'll put something in the KIP and you 
can just copy/paste from there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to