This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 351203873db KAFKA-19661 [5/N]: Use below-quota as a condition for 
standby task assignment (#20458)
351203873db is described below

commit 351203873dbe44048ed3c42a66416be2fa985116
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Sep 10 17:55:51 2025 +0200

    KAFKA-19661 [5/N]: Use below-quota as a condition for standby task 
assignment (#20458)
    
    In the original algorithm, standby tasks are assigned to a  process that
    previously owned the task only if it is  “load-balanced”, meaning the
    process has fewer tasks that  members, or it is the least loaded
    process. This is strong  requirement, and will cause standby tasks to
    often not get  assigned to process that previously owned it.
    Furthermore,  the condition “is the least loaded process” is hard to
    evaluate efficiently in this context.
    
    We propose to instead use the same “below-quota” condition  as in active
    task assignment.
    
    We compute a quota for active and standby tasks, by definiing numOfTasks
    = numberOfActiveTasks+numberOfStandbyTasks and  defining the quota as
    numOfTasks/numberOfMembers rounded up.  Whenever a member becomes “full”
    (its assigned number of tasks  is equal to numOfTasks) we deduct its
    tasks from numOfTasks and  decrement numberOfMembers and recompute the
    quota, which means  that the quota may be reduced by one during the
    assignment  process, to avoid uneven assignments.
    
    A standby task can be assigned to a process that previously  owned it,
    whenever the process has fewer than  numOfMembersOfProcess*quota.
    
    This condition will, again, prioritize standby stickyness,  and can be
    evaluated in constant time.
    
    In our worst-performing benchmark, this improves the runtime  by 2.5x on
    top of the previous optimizations, but 5x on the  more important
    incremental assignment case.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../group/streams/assignor/ProcessState.java       |   6 -
 .../group/streams/assignor/StickyTaskAssignor.java | 130 ++++++++++++---------
 .../streams/assignor/StickyTaskAssignorTest.java   |  62 ++++++++++
 3 files changed, 138 insertions(+), 60 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
index d4dd2d4ba49..84c1b8a8207 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
@@ -32,7 +32,6 @@ public class ProcessState {
     private int capacity;
     private double load;
     private int taskCount;
-    private int activeTaskCount;
     private final Map<String, Integer> memberToTaskCounts;
     private final Map<String, Set<TaskId>> assignedActiveTasks;
     private final Map<String, Set<TaskId>> assignedStandbyTasks;
@@ -65,10 +64,6 @@ public class ProcessState {
         return memberToTaskCounts;
     }
 
-    public int activeTaskCount() {
-        return activeTaskCount;
-    }
-
     public Set<TaskId> assignedActiveTasks() {
         return assignedActiveTasks.values().stream()
             .flatMap(Set::stream)
@@ -93,7 +88,6 @@ public class ProcessState {
         taskCount += 1;
         assignedTasks.add(taskId);
         if (isActive) {
-            activeTaskCount += 1;
             assignedActiveTasks.putIfAbsent(memberId, new HashSet<>());
             assignedActiveTasks.get(memberId).add(taskId);
         } else {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
index fc29f93b883..1b3f08a7382 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
@@ -57,13 +57,9 @@ public class StickyTaskAssignor implements TaskAssignor {
         final LinkedList<TaskId> activeTasks = taskIds(topologyDescriber, 
true);
         assignActive(activeTasks);
 
-        //standby
-        final int numStandbyReplicas =
-            groupSpec.assignmentConfigs().isEmpty() ? 0
-                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
-        if (numStandbyReplicas > 0) {
+        if (localState.numStandbyReplicas > 0) {
             final LinkedList<TaskId> statefulTasks = 
taskIds(topologyDescriber, false);
-            assignStandby(statefulTasks, numStandbyReplicas);
+            assignStandby(statefulTasks);
         }
 
         return buildGroupAssignment(groupSpec.members().keySet());
@@ -84,13 +80,24 @@ public class StickyTaskAssignor implements TaskAssignor {
 
     private void initialize(final GroupSpec groupSpec, final TopologyDescriber 
topologyDescriber) {
         localState = new LocalState();
-        localState.allTasks = 0;
+        localState.numStandbyReplicas =
+            groupSpec.assignmentConfigs().isEmpty() ? 0
+                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
+
+        // Helpers for computing active tasks per member, and tasks per member
+        localState.totalActiveTasks = 0;
+        localState.totalTasks = 0;
         for (final String subtopology : topologyDescriber.subtopologies()) {
             final int numberOfPartitions = 
topologyDescriber.maxNumInputPartitions(subtopology);
-            localState.allTasks += numberOfPartitions;
+            localState.totalTasks += numberOfPartitions;
+            localState.totalActiveTasks += numberOfPartitions;
+            if (topologyDescriber.isStateful(subtopology))
+                localState.totalTasks += numberOfPartitions * 
localState.numStandbyReplicas;
         }
-        localState.totalCapacity = groupSpec.members().size();
-        localState.tasksPerMember = computeTasksPerMember(localState.allTasks, 
localState.totalCapacity);
+        localState.totalMembersWithActiveTaskCapacity = 
groupSpec.members().size();
+        localState.totalMembersWithTaskCapacity = groupSpec.members().size();
+        localState.activeTasksPerMember = 
computeTasksPerMember(localState.totalActiveTasks, 
localState.totalMembersWithActiveTaskCapacity);
+        localState.tasksPerMember = 
computeTasksPerMember(localState.totalTasks, 
localState.totalMembersWithTaskCapacity);
 
         localState.processIdToState = new HashMap<>();
         localState.activeTaskToPrevMember = new HashMap<>();
@@ -175,11 +182,13 @@ public class StickyTaskAssignor implements TaskAssignor {
         for (final Iterator<TaskId> it = activeTasks.iterator(); 
it.hasNext();) {
             final TaskId task = it.next();
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
-            if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
+            if (prevMember != null) {
                 final ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
-                processState.addTask(prevMember.memberId, task, true);
-                maybeUpdateTasksPerMember(processState.activeTaskCount());
-                it.remove();
+                if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
+                    processState.addTask(prevMember.memberId, task, true);
+                    
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
+                    it.remove();
+                }
             }
         }
 
@@ -188,11 +197,13 @@ public class StickyTaskAssignor implements TaskAssignor {
             final TaskId task = it.next();
             final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
             final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, 
null);
-            if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
+            if (prevMember != null) {
                 final ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
-                processState.addTask(prevMember.memberId, task, true);
-                maybeUpdateTasksPerMember(processState.activeTaskCount());
-                it.remove();
+                if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
+                    processState.addTask(prevMember.memberId, task, true);
+                    
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
+                    it.remove();
+                }
             }
         }
 
@@ -206,24 +217,32 @@ public class StickyTaskAssignor implements TaskAssignor {
             final TaskId task = it.next();
             final ProcessState processWithLeastLoad = processByLoad.poll();
             if (processWithLeastLoad == null) {
-                throw new TaskAssignorException("No process available to 
assign active task {}." + task);
+                throw new TaskAssignorException(String.format("No process 
available to assign active task %s.", task));
             }
             final String member = memberWithLeastLoad(processWithLeastLoad);
             if (member == null) {
-                throw new TaskAssignorException("No member available to assign 
active task {}." + task);
+                throw new TaskAssignorException(String.format("No member 
available to assign active task %s.", task));
             }
             processWithLeastLoad.addTask(member, task, true);
             it.remove();
-            maybeUpdateTasksPerMember(processWithLeastLoad.activeTaskCount());
+            
maybeUpdateActiveTasksPerMember(processWithLeastLoad.memberToTaskCounts().get(member));
             processByLoad.add(processWithLeastLoad); // Add it back to the 
queue after updating its state
         }
     }
 
-    private void maybeUpdateTasksPerMember(final int activeTasksNo) {
-        if (activeTasksNo == localState.tasksPerMember) {
-            localState.totalCapacity--;
-            localState.allTasks -= activeTasksNo;
-            localState.tasksPerMember = 
computeTasksPerMember(localState.allTasks, localState.totalCapacity);
+    private void maybeUpdateActiveTasksPerMember(final int activeTasksNo) {
+        if (activeTasksNo == localState.activeTasksPerMember) {
+            localState.totalMembersWithActiveTaskCapacity--;
+            localState.totalActiveTasks -= activeTasksNo;
+            localState.activeTasksPerMember = 
computeTasksPerMember(localState.totalActiveTasks, 
localState.totalMembersWithActiveTaskCapacity);
+        }
+    }
+
+    private void maybeUpdateTasksPerMember(final int taskNo) {
+        if (taskNo == localState.tasksPerMember) {
+            localState.totalMembersWithTaskCapacity--;
+            localState.totalTasks -= taskNo;
+            localState.tasksPerMember = 
computeTasksPerMember(localState.totalTasks, 
localState.totalMembersWithTaskCapacity);
         }
     }
 
@@ -298,43 +317,49 @@ public class StickyTaskAssignor implements TaskAssignor {
         return memberWithLeastLoad.orElse(null);
     }
 
-    private boolean hasUnfulfilledQuota(final Member member) {
-        return 
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < localState.tasksPerMember;
+    private boolean hasUnfulfilledActiveTaskQuota(final ProcessState process, 
final Member member) {
+        return process.memberToTaskCounts().get(member.memberId) < 
localState.activeTasksPerMember;
     }
 
-    private void assignStandby(final LinkedList<TaskId> standbyTasks, int 
numStandbyReplicas) {
-        final ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
+    private boolean hasUnfulfilledTaskQuota(final ProcessState process, final 
Member member) {
+        return process.memberToTaskCounts().get(member.memberId) < 
localState.tasksPerMember;
+    }
 
+    private void assignStandby(final LinkedList<TaskId> standbyTasks) {
+        final ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * localState.numStandbyReplicas);
+        
         // Assuming our current assignment is range-based, we want to sort by 
partition first.
         
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
 
         for (TaskId task : standbyTasks) {
-            for (int i = 0; i < numStandbyReplicas; i++) {
+            for (int i = 0; i < localState.numStandbyReplicas; i++) {
 
                 // prev active task
-                final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
-                if (prevMember != null) {
-                    final ProcessState prevMemberProcessState = 
localState.processIdToState.get(prevMember.processId);
-                    if (!prevMemberProcessState.hasTask(task) && 
isLoadBalanced(prevMemberProcessState)) {
-                        prevMemberProcessState.addTask(prevMember.memberId, 
task, false);
+                final Member prevActiveMember = 
localState.activeTaskToPrevMember.get(task);
+                if (prevActiveMember != null) {
+                    final ProcessState prevActiveMemberProcessState = 
localState.processIdToState.get(prevActiveMember.processId);
+                    if (!prevActiveMemberProcessState.hasTask(task) && 
hasUnfulfilledTaskQuota(prevActiveMemberProcessState, prevActiveMember)) {
+                        
prevActiveMemberProcessState.addTask(prevActiveMember.memberId, task, false);
+                        
maybeUpdateTasksPerMember(prevActiveMemberProcessState.memberToTaskCounts().get(prevActiveMember.memberId));
                         continue;
                     }
                 }
 
                 // prev standby tasks
-                final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
-                if (prevMembers != null && !prevMembers.isEmpty()) {
-                    final Member prevMember2 = 
findPrevMemberWithLeastLoad(prevMembers, task);
-                    if (prevMember2 != null) {
-                        final ProcessState prevMemberProcessState = 
localState.processIdToState.get(prevMember2.processId);
-                        if (isLoadBalanced(prevMemberProcessState)) {
-                            
prevMemberProcessState.addTask(prevMember2.memberId, task, false);
+                final ArrayList<Member> prevStandbyMembers = 
localState.standbyTaskToPrevMember.get(task);
+                if (prevStandbyMembers != null && 
!prevStandbyMembers.isEmpty()) {
+                    final Member prevStandbyMember = 
findPrevMemberWithLeastLoad(prevStandbyMembers, task);
+                    if (prevStandbyMember != null) {
+                        final ProcessState prevStandbyMemberProcessState = 
localState.processIdToState.get(prevStandbyMember.processId);
+                        if 
(hasUnfulfilledTaskQuota(prevStandbyMemberProcessState, prevStandbyMember)) {
+                            
prevStandbyMemberProcessState.addTask(prevStandbyMember.memberId, task, false);
+                            
maybeUpdateTasksPerMember(prevStandbyMemberProcessState.memberToTaskCounts().get(prevStandbyMember.memberId));
                             continue;
                         }
                     }
                 }
 
-                toLeastLoaded.add(new StandbyToAssign(task, numStandbyReplicas 
- i));
+                toLeastLoaded.add(new StandbyToAssign(task, 
localState.numStandbyReplicas - i));
                 break;
             }
         }
@@ -350,7 +375,7 @@ public class StickyTaskAssignor implements TaskAssignor {
                 if (!assignStandbyToMemberWithLeastLoad(processByLoad, 
toAssign.taskId)) {
                     log.warn("{} There is not enough available capacity. " +
                             "You should increase the number of threads and/or 
application instances to maintain the requested number of standby replicas.",
-                        errorMessage(numStandbyReplicas, i, toAssign.taskId));
+                        errorMessage(localState.numStandbyReplicas, i, 
toAssign.taskId));
                     break;
                 }
             }
@@ -362,13 +387,6 @@ public class StickyTaskAssignor implements TaskAssignor {
             " of " + numStandbyReplicas + " standby tasks for task [" + task + 
"].";
     }
 
-    private boolean isLoadBalanced(final ProcessState process) {
-        final double load = process.load();
-        final boolean isLeastLoadedProcess = 
localState.processIdToState.values().stream()
-            .allMatch(p -> p.load() >= load);
-        return process.hasCapacity() || isLeastLoadedProcess;
-    }
-
     private static int computeTasksPerMember(final int numberOfTasks, final 
int numberOfMembers) {
         if (numberOfMembers == 0) {
             return 0;
@@ -406,8 +424,12 @@ public class StickyTaskAssignor implements TaskAssignor {
         Map<TaskId, ArrayList<Member>> standbyTaskToPrevMember;
         Map<String, ProcessState> processIdToState;
 
-        int allTasks;
-        int totalCapacity;
+        int numStandbyReplicas;
+        int totalActiveTasks;
+        int totalTasks;
+        int totalMembersWithActiveTaskCapacity;
+        int totalMembersWithTaskCapacity;
+        int activeTasksPerMember;
         int tasksPerMember;
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
index 1e9d4115cb2..53318578432 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
@@ -1233,6 +1233,68 @@ public class StickyTaskAssignorTest {
         assertEquals(1, 
testMember2.standbyTasks().get("test-subtopology2").size());
     }
 
+    @Test
+    public void 
shouldAssignStandbyTaskToPreviousOwnerBasedOnBelowQuotaCondition() {
+        // This test demonstrates the change from "load-balanced" to 
"below-quota" condition for standby assignment.
+        // We create a scenario where:
+        // - Process1 previously owned standby task 1 and currently has 1 
active task (task 0)
+        // - Process2 currently has 1 active task (task 1) 
+        // - Process3 has no previous tasks (least loaded after assignment)
+        // 
+        // Under the old "load-balanced" algorithm: Process1 might not get 
standby task 1 because 
+        // Process3 could be considered least loaded.
+        //
+        // Under the new "below-quota" algorithm: Process1 gets standby task 1 
because 
+        // it previously owned it AND is below quota.
+
+        // Set up previous task assignments:
+        // Process1: active=[0], standby=[1] (previously had both active and 
standby tasks)
+        // Process2: active=[1] (had the active task that process1 had as 
standby)
+        // Process3: no previous tasks
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", 
+            mkMap(mkEntry("test-subtopology", Sets.newSet(0))), 
+            mkMap(mkEntry("test-subtopology", Sets.newSet(1))));
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
+            mkMap(mkEntry("test-subtopology", Sets.newSet(1))), 
+            Map.of());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+            mkEntry("member1", memberSpec1), 
+            mkEntry("member2", memberSpec2), 
+            mkEntry("member3", memberSpec3));
+
+        // We have 2 active tasks + 1 standby replica = 4 total tasks
+        // Quota per process = 4 tasks / 3 processes = 1.33 -> 2 tasks per 
process
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(members, 
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
+            new TopologyDescriberImpl(2, true, List.of("test-subtopology"))
+        );
+
+        // Verify that process1 gets the standby task 1 that it previously 
owned
+        // This should work under the new "below-quota" algorithm since 
process1 has only 1 active task
+        // which is below the quota of 2 tasks per process
+        final MemberAssignment member1 = result.members().get("member1");
+        assertNotNull(member1);
+        
+        // Member1 should retain its active task 0
+        assertTrue(member1.activeTasks().get("test-subtopology").contains(0));
+        
+        // Member1 should get standby task 1 because it previously owned it 
and is below quota
+        assertNotNull(member1.standbyTasks().get("test-subtopology"), "Member1 
should have standby tasks assigned");
+        assertTrue(member1.standbyTasks().get("test-subtopology").contains(1), 
+            "Member1 should have standby task 1, but has: " + 
member1.standbyTasks().get("test-subtopology"));
+        
+        // Verify that member1 doesn't have active task 1 (standby can't be 
same as active)
+        assertFalse(member1.activeTasks().get("test-subtopology").contains(1));
+        
+        // Verify the process1's total task count is at or below quota
+        int member1ActiveCount = 
member1.activeTasks().get("test-subtopology").size();
+        int member1StandbyCount = 
member1.standbyTasks().get("test-subtopology").size();
+        int member1TotalTasks = member1ActiveCount + member1StandbyCount;
+        assertTrue(member1TotalTasks <= 2, "Member1 should have <= 2 total 
tasks (quota), but has " + member1TotalTasks);
+    }
+
 
     private int getAllActiveTaskCount(GroupAssignment result, String... 
memberIds) {
         int size = 0;

Reply via email to