Copilot commented on code in PR #20486:
URL: https://github.com/apache/kafka/pull/20486#discussion_r2330521209


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java:
##########
@@ -1091,6 +1091,148 @@ public void 
shouldHandleEdgeCaseWithMoreStandbyReplicasThanAvailableClients() {
         assertEquals(numTasks, allStandbyTasks.size());
     }
 
+    @Test
+    public void 
shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments() {
+        // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
+        // Node 2 has active tasks 2,3 and standby tasks 0,1
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
+            mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))),
+            mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))));
+
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
+            mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))),
+            mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))));
+
+        // Node 3 joins as new client
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+            mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), 
mkEntry("member3", memberSpec3));
+
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(members, 
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
+            new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
+        );
+
+        // Verify all active tasks are assigned
+        final Set<Integer> allAssignedActiveTasks = new HashSet<>();
+        allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member1"));
+        allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member2"));
+        allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member3"));
+        assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedActiveTasks);
+
+        // Verify all standby tasks are assigned
+        final Set<Integer> allAssignedStandbyTasks = new HashSet<>();
+        allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, 
"member1"));
+        allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, 
"member2"));
+        allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, 
"member3"));
+        assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedStandbyTasks);
+
+        // Verify each member has 1-2 active tasks and at most 3 tasks total
+        assertTrue(getAllActiveTaskIds(result, "member1").size() >= 1 && 
getAllActiveTaskIds(result, "member1").size() <= 2);
+        assertTrue(getAllActiveTaskIds(result, "member1").size() + 
getAllStandbyTaskIds(result, "member1").size() <= 3);
+
+        assertTrue(getAllActiveTaskIds(result, "member2").size() >= 1 && 
getAllActiveTaskIds(result, "member2").size() <= 2);
+        assertTrue(getAllActiveTaskIds(result, "member2").size() + 
getAllStandbyTaskIds(result, "member2").size() <= 3);
+
+        assertTrue(getAllActiveTaskIds(result, "member3").size() >= 1 && 
getAllActiveTaskIds(result, "member3").size() <= 2);
+        assertTrue(getAllActiveTaskIds(result, "member3").size() + 
getAllStandbyTaskIds(result, "member3").size() <= 3);
+    }
+
+    @Test
+    public void shouldRangeAssignTasksWhenScalingUp() {
+        // Two clients, the second one is new
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
+            Map.of("test-subtopology1", Set.of(0, 1), "test-subtopology2", 
Set.of(0, 1)),
+            Map.of());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+            mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));
+
+        // Two subtopologies with 2 tasks each (4 tasks total) with standby 
replicas enabled
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(members, 
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))),
+            new TopologyDescriberImpl(2, true, 
Arrays.asList("test-subtopology1", "test-subtopology2"))
+        );
+
+        // Each client should get one task from each subtopology
+        final MemberAssignment testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        assertEquals(1, 
testMember1.activeTasks().get("test-subtopology1").size());
+        assertEquals(1, 
testMember1.activeTasks().get("test-subtopology2").size());
+        
+        final MemberAssignment testMember2 = result.members().get("member2");
+        assertNotNull(testMember2);
+        assertEquals(1, 
testMember2.activeTasks().get("test-subtopology1").size());
+        assertEquals(1, 
testMember2.activeTasks().get("test-subtopology2").size());
+        
+        // Verify all tasks are assigned exactly once
+        final Set<Integer> allSubtopology1Tasks = new HashSet<>();
+        
allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1"));
+        
allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1"));
+        assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks);
+        
+        final Set<Integer> allSubtopology2Tasks = new HashSet<>();
+        
allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2"));
+        
allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2"));
+        assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks);
+
+        // Each client should get one task from each subtopology

Review Comment:
   The comment and assertions are duplicated from lines 1158-1167 for active 
tasks. The comment should be updated to reflect that this section is checking 
standby tasks, not active tasks.
   ```suggestion
           // Each client should get one standby task from each subtopology
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -296,9 +302,13 @@ private boolean hasUnfulfilledQuota(final Member member) {
         return 
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < localState.tasksPerMember;
     }
 
-    private void assignStandby(final Set<TaskId> standbyTasks, final int 
numStandbyReplicas) {
+    private void assignStandby(final LinkedList<TaskId> standbyTasks, int 
numStandbyReplicas) {
         final ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
-        for (final TaskId task : standbyTasks) {
+
+        // Assuming our current assignment is range-based, we want to sort by 
partition first.
+        
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());

Review Comment:
   The `.reversed()` is applied to the entire comparator chain, which reverses 
both partition and subtopology ordering. This may not be the intended behavior. 
Consider applying `.reversed()` only to the specific comparison that needs 
reversing, or clarify the intention with a comment.
   ```suggestion
           // Sort by partition ascending, then subtopology descending (if that 
is the intention)
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -329,6 +339,10 @@ private void assignStandby(final Set<TaskId> standbyTasks, 
final int numStandbyR
             }
         }
 
+        // To achieve a range-based assignment, sort by subtopology
+        toLeastLoaded.sort(Comparator.<StandbyToAssign, String>comparing(x -> 
x.taskId.subtopologyId())
+            .thenComparing(x -> x.taskId.partition()).reversed());

Review Comment:
   Similar to the previous sorting, `.reversed()` is applied to the entire 
comparator chain. This reverses both subtopology and partition ordering, which 
may not be the intended behavior for achieving range-based assignment.
   ```suggestion
               .thenComparing(x -> x.taskId.partition(), 
Comparator.reverseOrder()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to