dajac commented on code in PR #20730:
URL: https://github.com/apache/kafka/pull/20730#discussion_r2452278306


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java:
##########
@@ -304,6 +304,101 @@ private static boolean 
computeAssignmentDifferenceForOneSubtopology(final String
         return hasUnreleasedTasks;
     }
 
+    /**
+     * Takes the current currentAssignment and the targetAssignment, and 
generates three
+     * collections:
+     *
+     * - the resultAssignedTasks: the tasks that are assigned in both the 
current and target
+     * assignments.
+     * - the resultTasksPendingRevocation: the tasks that are assigned in the 
current
+     * assignment but not in the target assignment.
+     * - the resultTasksPendingAssignment: the tasks that are assigned in the 
target assignment but
+     * not in the current assignment, and can be assigned currently (i.e., 
they are not owned by
+     * another member, as defined by the `isUnreleasedTask` predicate).
+     *
+     * Epoch Handling:
+     * - For tasks in resultAssignedTasks and resultTasksPendingRevocation, 
the epoch from currentAssignment is preserved.
+     * - For tasks in resultTasksPendingAssignment, the targetAssignmentEpoch 
is used.
+     */
+    private boolean computeAssignmentDifferenceWithEpoch(Map<String, 
Map<Integer, Integer>> currentAssignment,

Review Comment:
   Those new methods are basically copies of the previous ones. The only 
difference is that they operate on `Map<String, Map<Integer, Integer>>` vs 
`Map<String, Set<Integer>>`. This is a bit unfortunate but I guess that we 
don't really have the choice unless we use `Map<String, Map<Integer, Integer>>` 
everywhere.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1670,7 +1699,7 @@ private static void 
throwIfStreamsGroupMemberEpochIsInvalid(
             // If the member comes with the previous epoch and has a subset of 
the current assignment partitions,
             // we accept it because the response with the bumped epoch may 
have been lost.
             if (receivedMemberEpoch != member.previousMemberEpoch()
-                || !areOwnedTasksContainedInAssignedTasks(ownedActiveTasks, 
member.assignedTasks().activeTasks())

Review Comment:
   Is `areOwnedTasksContainedInAssignedTasks` still used or could we remove it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2157,6 +2186,16 @@ private List<StreamsGroupHeartbeatResponseData.TaskIds> 
createStreamsGroupHeartb
             .collect(Collectors.toList());
     }
 
+    private List<StreamsGroupHeartbeatResponseData.TaskIds> 
createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(

Review Comment:
   nit: static?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable tuple containing active, standby and warm-up tasks with 
assignment epochs.
+ * <p>
+ * Active tasks include epoch information to support fencing of zombie commits.
+ * Standby and warmup tasks do not have epochs as they don't commit offsets.
+ *
+ * @param activeTasksWithEpochs Active tasks with their assignment epochs.
+ *                              The outer map key is the subtopology ID, the 
inner map key is the partition ID,
+ *                              and the inner map value is the assignment 
epoch.
+ * @param standbyTasks          Standby tasks.
+ *                              The key of the map is the subtopology ID, and 
the value is the set of partition IDs.
+ * @param warmupTasks           Warm-up tasks.
+ *                              The key of the map is the subtopology ID, and 
the value is the set of partition IDs.
+ */
+public record TasksTupleWithEpochs(Map<String, Map<Integer, Integer>> 
activeTasksWithEpochs,
+                                   Map<String, Set<Integer>> standbyTasks,
+                                   Map<String, Set<Integer>> warmupTasks) {
+
+    public TasksTupleWithEpochs {
+        activeTasksWithEpochs = 
Collections.unmodifiableMap(Objects.requireNonNull(activeTasksWithEpochs));
+        standbyTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
+        warmupTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
+    }
+
+    /**
+     * An empty task tuple.
+     */
+    public static final TasksTupleWithEpochs EMPTY = new TasksTupleWithEpochs(
+        Map.of(),
+        Map.of(),
+        Map.of()
+    );
+
+    /**
+     * Returns a map of active tasks (subtopology ID to partition IDs) by 
extracting just the keys
+     * from the activeTasksWithEpochs map, discarding epoch information.
+     * <p>
+     * This method creates a new map on each call. Consider using {@link 
#activeTasksWithEpochs()}
+     * directly when possible to avoid the conversion.

Review Comment:
   Is it possible to just get rid of the method if we don't recommend using it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -311,6 +311,33 @@ private static 
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIds(
         return taskIds;
     }
 
+    private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> 
toTaskIdsWithEpochs(
+        Map<String, Map<Integer, Integer>> tasksWithEpochs
+    ) {
+        List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new 
ArrayList<>(tasksWithEpochs.size());
+        tasksWithEpochs.forEach((subtopologyId, partitionEpochMap) -> {
+            // Sort by partition for consistent ordering
+            List<Map.Entry<Integer, Integer>> sortedEntries = 
partitionEpochMap.entrySet().stream()
+                .sorted(Comparator.comparingInt(Map.Entry::getKey))
+                .toList();
+            
+            List<Integer> partitions = new ArrayList<>(sortedEntries.size());
+            List<Integer> epochs = new ArrayList<>(sortedEntries.size());
+            
+            for (Map.Entry<Integer, Integer> entry : sortedEntries) {
+                partitions.add(entry.getKey());
+                epochs.add(entry.getValue());
+            }
+            
+            taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopologyId(subtopologyId)
+                .setPartitions(partitions)
+                .setAssignmentEpochs(epochs));
+        });
+        
taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId));

Review Comment:
   I guess that this is not strictly necessary but convenient for testing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to