ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605659120
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ########## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ - private ApplicationState buildApplicationState(final Map<UUID, ClientMetadata> clientMetadataMap, - final Set<TaskId> statefulTasks) { - final Set<TaskId> statelessTasks = new HashSet<>(); - for (final Map.Entry<UUID, ClientMetadata> clientEntry : clientMetadataMap.entrySet()) { - final ClientState clientState = clientEntry.getValue().state; - statelessTasks.addAll(clientState.statelessActiveTasks()); + private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map<UUID, ClientMetadata> clientMetadataMap, + final Map<Subtopology, TopicsInfo> topicGroups, + final Cluster cluster) { + final Map<Subtopology, Set<String>> sourceTopicsByGroup = new HashMap<>(); + final Map<Subtopology, Set<String>> changelogTopicsByGroup = new HashMap<>(); + for (final Map.Entry<Subtopology, TopicsInfo> entry : topicGroups.entrySet()) { + final Set<String> sourceTopics = entry.getValue().sourceTopics; + final Set<String> changelogTopics = entry.getValue().stateChangelogTopics() + .stream().map(t -> t.name).collect(Collectors.toSet()); + sourceTopicsByGroup.put(entry.getKey(), sourceTopics); + changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } + final Map<TaskId, Set<TopicPartition>> sourcePartitionsForTask = + partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); + final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask = + partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + + final Set<TaskId> logicalTaskIds = new HashSet<>(); + final Set<TopicPartition> sourceTopicPartitions = new HashSet<>(); + sourcePartitionsForTask.forEach((taskId, partitions) -> { + logicalTaskIds.add(taskId); + sourceTopicPartitions.addAll(partitions); + }); + final Set<TopicPartition> changelogTopicPartitions = new HashSet<>(); + changelogPartitionsForTask.forEach((taskId, partitions) -> { + logicalTaskIds.add(taskId); Review Comment: Note that we'll also want to deduplicate the source-changelog partitions for the rack id computation. We should include them in the source topics/remove them from the changelog topics passed into the `#getRacksForTopicPartitions` call. Of course we still need the changelogTopicPartitions as well, so we'll want a third set of `nonSourceChangelogTopicPartitions` that's specifically for the rack id computation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org