ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605659907


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
      *
      * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
      *                          {@code ApplicationState}
-     * @param statefulTasks     the set of {@code TaskId} that correspond to 
all the stateful
-     *                          tasks that need to be reassigned.
      * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
      *         assignments.
      */
-    private ApplicationState buildApplicationState(final Map<UUID, 
ClientMetadata> clientMetadataMap,
-                                                   final Set<TaskId> 
statefulTasks) {
-        final Set<TaskId> statelessTasks = new HashSet<>();
-        for (final Map.Entry<UUID, ClientMetadata> clientEntry : 
clientMetadataMap.entrySet()) {
-            final ClientState clientState = clientEntry.getValue().state;
-            statelessTasks.addAll(clientState.statelessActiveTasks());
+    private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+                                                   final Map<UUID, 
ClientMetadata> clientMetadataMap,
+                                                   final Map<Subtopology, 
TopicsInfo> topicGroups,
+                                                   final Cluster cluster) {
+        final Map<Subtopology, Set<String>> sourceTopicsByGroup = new 
HashMap<>();
+        final Map<Subtopology, Set<String>> changelogTopicsByGroup = new 
HashMap<>();
+        for (final Map.Entry<Subtopology, TopicsInfo> entry : 
topicGroups.entrySet()) {
+            final Set<String> sourceTopics = entry.getValue().sourceTopics;
+            final Set<String> changelogTopics = 
entry.getValue().stateChangelogTopics()
+                .stream().map(t -> t.name).collect(Collectors.toSet());
+            sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+            changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
         }
 
+        final Map<TaskId, Set<TopicPartition>> sourcePartitionsForTask =
+            partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+        final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask =
+            partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+        final Set<TaskId> logicalTaskIds = new HashSet<>();
+        final Set<TopicPartition> sourceTopicPartitions = new HashSet<>();
+        sourcePartitionsForTask.forEach((taskId, partitions) -> {
+            logicalTaskIds.add(taskId);
+            sourceTopicPartitions.addAll(partitions);
+        });
+        final Set<TopicPartition> changelogTopicPartitions = new HashSet<>();
+        changelogPartitionsForTask.forEach((taskId, partitions) -> {
+            logicalTaskIds.add(taskId);

Review Comment:
   To be more precise, I'm imagining something like this:
   
   ```
   final Set<TopicPartition> sourceTopicPartitions = new HashSet<>();
   final Set<TopicPartition> changelogTopicPartitions = new HashSet<>();
   final Set<TopicPartition> nonSourceChangelogTopicPartitions = new 
HashSet<>();
   
   for (final var entry : sourceTopicPartitions.entrySet()) {
       final TaskId task = entry.getKey();
       final Set<TopicPartition> taskSourcePartitions = entry.getValue();
       final Set<TopicPartition> taskChangelogPartitions = 
changelogTopicPartitions.get(taskId);
       final Set<TopicPartition> taskNonSourceChangelogPartitions = new 
HashSet(taskChangelogPartitions);
       taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions);
   
       logicalTaskIds.add(taskId);
       sourceTopicPartitions.addAll(taskSourcePartitions);
       changelogTopicPartitions.addAll(taskChangelogPartitions);
      
nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions);
   }
   ```
   
   Then we pass the `nonSourceChangelogPartitions` into the 
`#getRacksForTopicPartition` instead of the `changelogPartitions` set. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to