This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 351203873db KAFKA-19661 [5/N]: Use below-quota as a condition for
standby task assignment (#20458)
351203873db is described below
commit 351203873dbe44048ed3c42a66416be2fa985116
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Sep 10 17:55:51 2025 +0200
KAFKA-19661 [5/N]: Use below-quota as a condition for standby task
assignment (#20458)
In the original algorithm, standby tasks are assigned to a process that
previously owned the task only if it is “load-balanced”, meaning the
process has fewer tasks that members, or it is the least loaded
process. This is strong requirement, and will cause standby tasks to
often not get assigned to process that previously owned it.
Furthermore, the condition “is the least loaded process” is hard to
evaluate efficiently in this context.
We propose to instead use the same “below-quota” condition as in active
task assignment.
We compute a quota for active and standby tasks, by definiing numOfTasks
= numberOfActiveTasks+numberOfStandbyTasks and defining the quota as
numOfTasks/numberOfMembers rounded up. Whenever a member becomes “full”
(its assigned number of tasks is equal to numOfTasks) we deduct its
tasks from numOfTasks and decrement numberOfMembers and recompute the
quota, which means that the quota may be reduced by one during the
assignment process, to avoid uneven assignments.
A standby task can be assigned to a process that previously owned it,
whenever the process has fewer than numOfMembersOfProcess*quota.
This condition will, again, prioritize standby stickyness, and can be
evaluated in constant time.
In our worst-performing benchmark, this improves the runtime by 2.5x on
top of the previous optimizations, but 5x on the more important
incremental assignment case.
Reviewers: Bill Bejeck <[email protected]>
---
.../group/streams/assignor/ProcessState.java | 6 -
.../group/streams/assignor/StickyTaskAssignor.java | 130 ++++++++++++---------
.../streams/assignor/StickyTaskAssignorTest.java | 62 ++++++++++
3 files changed, 138 insertions(+), 60 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
index d4dd2d4ba49..84c1b8a8207 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
@@ -32,7 +32,6 @@ public class ProcessState {
private int capacity;
private double load;
private int taskCount;
- private int activeTaskCount;
private final Map<String, Integer> memberToTaskCounts;
private final Map<String, Set<TaskId>> assignedActiveTasks;
private final Map<String, Set<TaskId>> assignedStandbyTasks;
@@ -65,10 +64,6 @@ public class ProcessState {
return memberToTaskCounts;
}
- public int activeTaskCount() {
- return activeTaskCount;
- }
-
public Set<TaskId> assignedActiveTasks() {
return assignedActiveTasks.values().stream()
.flatMap(Set::stream)
@@ -93,7 +88,6 @@ public class ProcessState {
taskCount += 1;
assignedTasks.add(taskId);
if (isActive) {
- activeTaskCount += 1;
assignedActiveTasks.putIfAbsent(memberId, new HashSet<>());
assignedActiveTasks.get(memberId).add(taskId);
} else {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
index fc29f93b883..1b3f08a7382 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
@@ -57,13 +57,9 @@ public class StickyTaskAssignor implements TaskAssignor {
final LinkedList<TaskId> activeTasks = taskIds(topologyDescriber,
true);
assignActive(activeTasks);
- //standby
- final int numStandbyReplicas =
- groupSpec.assignmentConfigs().isEmpty() ? 0
- :
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
- if (numStandbyReplicas > 0) {
+ if (localState.numStandbyReplicas > 0) {
final LinkedList<TaskId> statefulTasks =
taskIds(topologyDescriber, false);
- assignStandby(statefulTasks, numStandbyReplicas);
+ assignStandby(statefulTasks);
}
return buildGroupAssignment(groupSpec.members().keySet());
@@ -84,13 +80,24 @@ public class StickyTaskAssignor implements TaskAssignor {
private void initialize(final GroupSpec groupSpec, final TopologyDescriber
topologyDescriber) {
localState = new LocalState();
- localState.allTasks = 0;
+ localState.numStandbyReplicas =
+ groupSpec.assignmentConfigs().isEmpty() ? 0
+ :
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
+
+ // Helpers for computing active tasks per member, and tasks per member
+ localState.totalActiveTasks = 0;
+ localState.totalTasks = 0;
for (final String subtopology : topologyDescriber.subtopologies()) {
final int numberOfPartitions =
topologyDescriber.maxNumInputPartitions(subtopology);
- localState.allTasks += numberOfPartitions;
+ localState.totalTasks += numberOfPartitions;
+ localState.totalActiveTasks += numberOfPartitions;
+ if (topologyDescriber.isStateful(subtopology))
+ localState.totalTasks += numberOfPartitions *
localState.numStandbyReplicas;
}
- localState.totalCapacity = groupSpec.members().size();
- localState.tasksPerMember = computeTasksPerMember(localState.allTasks,
localState.totalCapacity);
+ localState.totalMembersWithActiveTaskCapacity =
groupSpec.members().size();
+ localState.totalMembersWithTaskCapacity = groupSpec.members().size();
+ localState.activeTasksPerMember =
computeTasksPerMember(localState.totalActiveTasks,
localState.totalMembersWithActiveTaskCapacity);
+ localState.tasksPerMember =
computeTasksPerMember(localState.totalTasks,
localState.totalMembersWithTaskCapacity);
localState.processIdToState = new HashMap<>();
localState.activeTaskToPrevMember = new HashMap<>();
@@ -175,11 +182,13 @@ public class StickyTaskAssignor implements TaskAssignor {
for (final Iterator<TaskId> it = activeTasks.iterator();
it.hasNext();) {
final TaskId task = it.next();
final Member prevMember =
localState.activeTaskToPrevMember.get(task);
- if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
+ if (prevMember != null) {
final ProcessState processState =
localState.processIdToState.get(prevMember.processId);
- processState.addTask(prevMember.memberId, task, true);
- maybeUpdateTasksPerMember(processState.activeTaskCount());
- it.remove();
+ if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
+ processState.addTask(prevMember.memberId, task, true);
+
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
+ it.remove();
+ }
}
}
@@ -188,11 +197,13 @@ public class StickyTaskAssignor implements TaskAssignor {
final TaskId task = it.next();
final ArrayList<Member> prevMembers =
localState.standbyTaskToPrevMember.get(task);
final Member prevMember = findPrevMemberWithLeastLoad(prevMembers,
null);
- if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
+ if (prevMember != null) {
final ProcessState processState =
localState.processIdToState.get(prevMember.processId);
- processState.addTask(prevMember.memberId, task, true);
- maybeUpdateTasksPerMember(processState.activeTaskCount());
- it.remove();
+ if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
+ processState.addTask(prevMember.memberId, task, true);
+
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
+ it.remove();
+ }
}
}
@@ -206,24 +217,32 @@ public class StickyTaskAssignor implements TaskAssignor {
final TaskId task = it.next();
final ProcessState processWithLeastLoad = processByLoad.poll();
if (processWithLeastLoad == null) {
- throw new TaskAssignorException("No process available to
assign active task {}." + task);
+ throw new TaskAssignorException(String.format("No process
available to assign active task %s.", task));
}
final String member = memberWithLeastLoad(processWithLeastLoad);
if (member == null) {
- throw new TaskAssignorException("No member available to assign
active task {}." + task);
+ throw new TaskAssignorException(String.format("No member
available to assign active task %s.", task));
}
processWithLeastLoad.addTask(member, task, true);
it.remove();
- maybeUpdateTasksPerMember(processWithLeastLoad.activeTaskCount());
+
maybeUpdateActiveTasksPerMember(processWithLeastLoad.memberToTaskCounts().get(member));
processByLoad.add(processWithLeastLoad); // Add it back to the
queue after updating its state
}
}
- private void maybeUpdateTasksPerMember(final int activeTasksNo) {
- if (activeTasksNo == localState.tasksPerMember) {
- localState.totalCapacity--;
- localState.allTasks -= activeTasksNo;
- localState.tasksPerMember =
computeTasksPerMember(localState.allTasks, localState.totalCapacity);
+ private void maybeUpdateActiveTasksPerMember(final int activeTasksNo) {
+ if (activeTasksNo == localState.activeTasksPerMember) {
+ localState.totalMembersWithActiveTaskCapacity--;
+ localState.totalActiveTasks -= activeTasksNo;
+ localState.activeTasksPerMember =
computeTasksPerMember(localState.totalActiveTasks,
localState.totalMembersWithActiveTaskCapacity);
+ }
+ }
+
+ private void maybeUpdateTasksPerMember(final int taskNo) {
+ if (taskNo == localState.tasksPerMember) {
+ localState.totalMembersWithTaskCapacity--;
+ localState.totalTasks -= taskNo;
+ localState.tasksPerMember =
computeTasksPerMember(localState.totalTasks,
localState.totalMembersWithTaskCapacity);
}
}
@@ -298,43 +317,49 @@ public class StickyTaskAssignor implements TaskAssignor {
return memberWithLeastLoad.orElse(null);
}
- private boolean hasUnfulfilledQuota(final Member member) {
- return
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
< localState.tasksPerMember;
+ private boolean hasUnfulfilledActiveTaskQuota(final ProcessState process,
final Member member) {
+ return process.memberToTaskCounts().get(member.memberId) <
localState.activeTasksPerMember;
}
- private void assignStandby(final LinkedList<TaskId> standbyTasks, int
numStandbyReplicas) {
- final ArrayList<StandbyToAssign> toLeastLoaded = new
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
+ private boolean hasUnfulfilledTaskQuota(final ProcessState process, final
Member member) {
+ return process.memberToTaskCounts().get(member.memberId) <
localState.tasksPerMember;
+ }
+ private void assignStandby(final LinkedList<TaskId> standbyTasks) {
+ final ArrayList<StandbyToAssign> toLeastLoaded = new
ArrayList<>(standbyTasks.size() * localState.numStandbyReplicas);
+
// Assuming our current assignment is range-based, we want to sort by
partition first.
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
for (TaskId task : standbyTasks) {
- for (int i = 0; i < numStandbyReplicas; i++) {
+ for (int i = 0; i < localState.numStandbyReplicas; i++) {
// prev active task
- final Member prevMember =
localState.activeTaskToPrevMember.get(task);
- if (prevMember != null) {
- final ProcessState prevMemberProcessState =
localState.processIdToState.get(prevMember.processId);
- if (!prevMemberProcessState.hasTask(task) &&
isLoadBalanced(prevMemberProcessState)) {
- prevMemberProcessState.addTask(prevMember.memberId,
task, false);
+ final Member prevActiveMember =
localState.activeTaskToPrevMember.get(task);
+ if (prevActiveMember != null) {
+ final ProcessState prevActiveMemberProcessState =
localState.processIdToState.get(prevActiveMember.processId);
+ if (!prevActiveMemberProcessState.hasTask(task) &&
hasUnfulfilledTaskQuota(prevActiveMemberProcessState, prevActiveMember)) {
+
prevActiveMemberProcessState.addTask(prevActiveMember.memberId, task, false);
+
maybeUpdateTasksPerMember(prevActiveMemberProcessState.memberToTaskCounts().get(prevActiveMember.memberId));
continue;
}
}
// prev standby tasks
- final ArrayList<Member> prevMembers =
localState.standbyTaskToPrevMember.get(task);
- if (prevMembers != null && !prevMembers.isEmpty()) {
- final Member prevMember2 =
findPrevMemberWithLeastLoad(prevMembers, task);
- if (prevMember2 != null) {
- final ProcessState prevMemberProcessState =
localState.processIdToState.get(prevMember2.processId);
- if (isLoadBalanced(prevMemberProcessState)) {
-
prevMemberProcessState.addTask(prevMember2.memberId, task, false);
+ final ArrayList<Member> prevStandbyMembers =
localState.standbyTaskToPrevMember.get(task);
+ if (prevStandbyMembers != null &&
!prevStandbyMembers.isEmpty()) {
+ final Member prevStandbyMember =
findPrevMemberWithLeastLoad(prevStandbyMembers, task);
+ if (prevStandbyMember != null) {
+ final ProcessState prevStandbyMemberProcessState =
localState.processIdToState.get(prevStandbyMember.processId);
+ if
(hasUnfulfilledTaskQuota(prevStandbyMemberProcessState, prevStandbyMember)) {
+
prevStandbyMemberProcessState.addTask(prevStandbyMember.memberId, task, false);
+
maybeUpdateTasksPerMember(prevStandbyMemberProcessState.memberToTaskCounts().get(prevStandbyMember.memberId));
continue;
}
}
}
- toLeastLoaded.add(new StandbyToAssign(task, numStandbyReplicas
- i));
+ toLeastLoaded.add(new StandbyToAssign(task,
localState.numStandbyReplicas - i));
break;
}
}
@@ -350,7 +375,7 @@ public class StickyTaskAssignor implements TaskAssignor {
if (!assignStandbyToMemberWithLeastLoad(processByLoad,
toAssign.taskId)) {
log.warn("{} There is not enough available capacity. " +
"You should increase the number of threads and/or
application instances to maintain the requested number of standby replicas.",
- errorMessage(numStandbyReplicas, i, toAssign.taskId));
+ errorMessage(localState.numStandbyReplicas, i,
toAssign.taskId));
break;
}
}
@@ -362,13 +387,6 @@ public class StickyTaskAssignor implements TaskAssignor {
" of " + numStandbyReplicas + " standby tasks for task [" + task +
"].";
}
- private boolean isLoadBalanced(final ProcessState process) {
- final double load = process.load();
- final boolean isLeastLoadedProcess =
localState.processIdToState.values().stream()
- .allMatch(p -> p.load() >= load);
- return process.hasCapacity() || isLeastLoadedProcess;
- }
-
private static int computeTasksPerMember(final int numberOfTasks, final
int numberOfMembers) {
if (numberOfMembers == 0) {
return 0;
@@ -406,8 +424,12 @@ public class StickyTaskAssignor implements TaskAssignor {
Map<TaskId, ArrayList<Member>> standbyTaskToPrevMember;
Map<String, ProcessState> processIdToState;
- int allTasks;
- int totalCapacity;
+ int numStandbyReplicas;
+ int totalActiveTasks;
+ int totalTasks;
+ int totalMembersWithActiveTaskCapacity;
+ int totalMembersWithTaskCapacity;
+ int activeTasksPerMember;
int tasksPerMember;
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
index 1e9d4115cb2..53318578432 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
@@ -1233,6 +1233,68 @@ public class StickyTaskAssignorTest {
assertEquals(1,
testMember2.standbyTasks().get("test-subtopology2").size());
}
+ @Test
+ public void
shouldAssignStandbyTaskToPreviousOwnerBasedOnBelowQuotaCondition() {
+ // This test demonstrates the change from "load-balanced" to
"below-quota" condition for standby assignment.
+ // We create a scenario where:
+ // - Process1 previously owned standby task 1 and currently has 1
active task (task 0)
+ // - Process2 currently has 1 active task (task 1)
+ // - Process3 has no previous tasks (least loaded after assignment)
+ //
+ // Under the old "load-balanced" algorithm: Process1 might not get
standby task 1 because
+ // Process3 could be considered least loaded.
+ //
+ // Under the new "below-quota" algorithm: Process1 gets standby task 1
because
+ // it previously owned it AND is below quota.
+
+ // Set up previous task assignments:
+ // Process1: active=[0], standby=[1] (previously had both active and
standby tasks)
+ // Process2: active=[1] (had the active task that process1 had as
standby)
+ // Process3: no previous tasks
+ final AssignmentMemberSpec memberSpec1 =
createAssignmentMemberSpec("process1",
+ mkMap(mkEntry("test-subtopology", Sets.newSet(0))),
+ mkMap(mkEntry("test-subtopology", Sets.newSet(1))));
+ final AssignmentMemberSpec memberSpec2 =
createAssignmentMemberSpec("process2",
+ mkMap(mkEntry("test-subtopology", Sets.newSet(1))),
+ Map.of());
+ final AssignmentMemberSpec memberSpec3 =
createAssignmentMemberSpec("process3");
+
+ final Map<String, AssignmentMemberSpec> members = mkMap(
+ mkEntry("member1", memberSpec1),
+ mkEntry("member2", memberSpec2),
+ mkEntry("member3", memberSpec3));
+
+ // We have 2 active tasks + 1 standby replica = 4 total tasks
+ // Quota per process = 4 tasks / 3 processes = 1.33 -> 2 tasks per
process
+ final GroupAssignment result = assignor.assign(
+ new GroupSpecImpl(members,
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
+ new TopologyDescriberImpl(2, true, List.of("test-subtopology"))
+ );
+
+ // Verify that process1 gets the standby task 1 that it previously
owned
+ // This should work under the new "below-quota" algorithm since
process1 has only 1 active task
+ // which is below the quota of 2 tasks per process
+ final MemberAssignment member1 = result.members().get("member1");
+ assertNotNull(member1);
+
+ // Member1 should retain its active task 0
+ assertTrue(member1.activeTasks().get("test-subtopology").contains(0));
+
+ // Member1 should get standby task 1 because it previously owned it
and is below quota
+ assertNotNull(member1.standbyTasks().get("test-subtopology"), "Member1
should have standby tasks assigned");
+ assertTrue(member1.standbyTasks().get("test-subtopology").contains(1),
+ "Member1 should have standby task 1, but has: " +
member1.standbyTasks().get("test-subtopology"));
+
+ // Verify that member1 doesn't have active task 1 (standby can't be
same as active)
+ assertFalse(member1.activeTasks().get("test-subtopology").contains(1));
+
+ // Verify the process1's total task count is at or below quota
+ int member1ActiveCount =
member1.activeTasks().get("test-subtopology").size();
+ int member1StandbyCount =
member1.standbyTasks().get("test-subtopology").size();
+ int member1TotalTasks = member1ActiveCount + member1StandbyCount;
+ assertTrue(member1TotalTasks <= 2, "Member1 should have <= 2 total
tasks (quota), but has " + member1TotalTasks);
+ }
+
private int getAllActiveTaskCount(GroupAssignment result, String...
memberIds) {
int size = 0;