Copilot commented on code in PR #20486:
URL: https://github.com/apache/kafka/pull/20486#discussion_r2330521209
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java:
##########
@@ -1091,6 +1091,148 @@ public void
shouldHandleEdgeCaseWithMoreStandbyReplicasThanAvailableClients() {
assertEquals(numTasks, allStandbyTasks.size());
}
+ @Test
+ public void
shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments() {
+ // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
+ // Node 2 has active tasks 2,3 and standby tasks 0,1
+ final AssignmentMemberSpec memberSpec1 =
createAssignmentMemberSpec("process1",
+ mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))),
+ mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))));
+
+ final AssignmentMemberSpec memberSpec2 =
createAssignmentMemberSpec("process2",
+ mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))),
+ mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))));
+
+ // Node 3 joins as new client
+ final AssignmentMemberSpec memberSpec3 =
createAssignmentMemberSpec("process3");
+
+ final Map<String, AssignmentMemberSpec> members = mkMap(
+ mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2),
mkEntry("member3", memberSpec3));
+
+ final GroupAssignment result = assignor.assign(
+ new GroupSpecImpl(members,
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
+ new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
+ );
+
+ // Verify all active tasks are assigned
+ final Set<Integer> allAssignedActiveTasks = new HashSet<>();
+ allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member1"));
+ allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member2"));
+ allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member3"));
+ assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedActiveTasks);
+
+ // Verify all standby tasks are assigned
+ final Set<Integer> allAssignedStandbyTasks = new HashSet<>();
+ allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result,
"member1"));
+ allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result,
"member2"));
+ allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result,
"member3"));
+ assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedStandbyTasks);
+
+ // Verify each member has 1-2 active tasks and at most 3 tasks total
+ assertTrue(getAllActiveTaskIds(result, "member1").size() >= 1 &&
getAllActiveTaskIds(result, "member1").size() <= 2);
+ assertTrue(getAllActiveTaskIds(result, "member1").size() +
getAllStandbyTaskIds(result, "member1").size() <= 3);
+
+ assertTrue(getAllActiveTaskIds(result, "member2").size() >= 1 &&
getAllActiveTaskIds(result, "member2").size() <= 2);
+ assertTrue(getAllActiveTaskIds(result, "member2").size() +
getAllStandbyTaskIds(result, "member2").size() <= 3);
+
+ assertTrue(getAllActiveTaskIds(result, "member3").size() >= 1 &&
getAllActiveTaskIds(result, "member3").size() <= 2);
+ assertTrue(getAllActiveTaskIds(result, "member3").size() +
getAllStandbyTaskIds(result, "member3").size() <= 3);
+ }
+
+ @Test
+ public void shouldRangeAssignTasksWhenScalingUp() {
+ // Two clients, the second one is new
+ final AssignmentMemberSpec memberSpec1 =
createAssignmentMemberSpec("process1",
+ Map.of("test-subtopology1", Set.of(0, 1), "test-subtopology2",
Set.of(0, 1)),
+ Map.of());
+ final AssignmentMemberSpec memberSpec2 =
createAssignmentMemberSpec("process2");
+ final Map<String, AssignmentMemberSpec> members = mkMap(
+ mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));
+
+ // Two subtopologies with 2 tasks each (4 tasks total) with standby
replicas enabled
+ final GroupAssignment result = assignor.assign(
+ new GroupSpecImpl(members,
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))),
+ new TopologyDescriberImpl(2, true,
Arrays.asList("test-subtopology1", "test-subtopology2"))
+ );
+
+ // Each client should get one task from each subtopology
+ final MemberAssignment testMember1 = result.members().get("member1");
+ assertNotNull(testMember1);
+ assertEquals(1,
testMember1.activeTasks().get("test-subtopology1").size());
+ assertEquals(1,
testMember1.activeTasks().get("test-subtopology2").size());
+
+ final MemberAssignment testMember2 = result.members().get("member2");
+ assertNotNull(testMember2);
+ assertEquals(1,
testMember2.activeTasks().get("test-subtopology1").size());
+ assertEquals(1,
testMember2.activeTasks().get("test-subtopology2").size());
+
+ // Verify all tasks are assigned exactly once
+ final Set<Integer> allSubtopology1Tasks = new HashSet<>();
+
allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1"));
+
allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1"));
+ assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks);
+
+ final Set<Integer> allSubtopology2Tasks = new HashSet<>();
+
allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2"));
+
allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2"));
+ assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks);
+
+ // Each client should get one task from each subtopology
Review Comment:
The comment and assertions are duplicated from lines 1158-1167 for active
tasks. The comment should be updated to reflect that this section is checking
standby tasks, not active tasks.
```suggestion
// Each client should get one standby task from each subtopology
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -296,9 +302,13 @@ private boolean hasUnfulfilledQuota(final Member member) {
return
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
< localState.tasksPerMember;
}
- private void assignStandby(final Set<TaskId> standbyTasks, final int
numStandbyReplicas) {
+ private void assignStandby(final LinkedList<TaskId> standbyTasks, int
numStandbyReplicas) {
final ArrayList<StandbyToAssign> toLeastLoaded = new
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
- for (final TaskId task : standbyTasks) {
+
+ // Assuming our current assignment is range-based, we want to sort by
partition first.
+
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
Review Comment:
The `.reversed()` is applied to the entire comparator chain, which reverses
both partition and subtopology ordering. This may not be the intended behavior.
Consider applying `.reversed()` only to the specific comparison that needs
reversing, or clarify the intention with a comment.
```suggestion
// Sort by partition ascending, then subtopology descending (if that
is the intention)
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -329,6 +339,10 @@ private void assignStandby(final Set<TaskId> standbyTasks,
final int numStandbyR
}
}
+ // To achieve a range-based assignment, sort by subtopology
+ toLeastLoaded.sort(Comparator.<StandbyToAssign, String>comparing(x ->
x.taskId.subtopologyId())
+ .thenComparing(x -> x.taskId.partition()).reversed());
Review Comment:
Similar to the previous sorting, `.reversed()` is applied to the entire
comparator chain. This reverses both subtopology and partition ordering, which
may not be the intended behavior for achieving range-based assignment.
```suggestion
.thenComparing(x -> x.taskId.partition(),
Comparator.reverseOrder()));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]