This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29cf97b9ad7 KAFKA-19478 [2/N]: Remove task pairs (#20127)
29cf97b9ad7 is described below

commit 29cf97b9ad75f01108e338b78a315d6019ad1953
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jul 14 09:13:51 2025 +0200

    KAFKA-19478 [2/N]: Remove task pairs (#20127)
    
    Task pairs is an optimization that is enabled in the current sticky task
    assignor.
    
    The basic idea is that every time we add a task A to a client that has
    tasks B, C, we add pairs (A, B) and (A, C) to a global collection of
    pairs. When adding a standby task, we then prioritize creating standby
    tasks that create new task pairs. If this does not work, we fall back to
    the original behavior.
    
    The complexity of this optimization is fairly significant, and its
    usefulness is questionable, the HighAvailabilityAssignor does not seem
    to have such an optimization, and the absence of this optimization does
    not seem to have caused any problems that I know of. I could not find
    any what this optimization is actually trying to achieve.
    
    A side effect of it is that we will sometimes avoid “small loops”, such
    as
    
            Node A: ActiveTask1, StandbyTask2 Node B: ActiveTask2,
    StandbyTask1                            Node C: ActiveTask3,
    StandbyTask4                            Node D: ActiveTask4,
    StandbyTask3
    
    So a small loop like this, worst case losing two nodes will cause 2
    tasks to go down, so the assignor is preferring
    
            Node A: ActiveTask1, StandbyTask4 Node B: ActiveTask2,
    StandbyTask1                            Node C: ActiveTask3,
    StandbyTask2                            Node D: ActiveTask4,
    StandbyTask3
    
    Which is a “big loop” assignment, where worst-case losing two nodes will
    cause at most 1 task to be unavailable. However, this optimization seems
    fairly niche, and also the current implementation does not seem to
    implement it in a direct form, but a more relaxed constraint which
    usually, does not always avoid small loops. So it remains unclear
    whether  this is really the intention behind the optimization. The
    current unit  tests of the StickyTaskAssignor pass even after removing
    the  optimization.
    
    The pairs optimization has a worst-case quadratic space and time
    complexity in the number of tasks, and make a lot of other optimizations
    impossible, so I’d suggest we remove it. I don’t think, in its current
    form, it is suitable to be implemented in a broker-side assignor. Note,
    however, if we identify a useful effect of the code in the future, we
    can work on finding an efficient algorithm that can bring the
    optimization to our broker-side assignor.
    
    This reduces the runtime of our worst case benchmark by 10x.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../group/streams/assignor/StickyTaskAssignor.java | 101 ++-------------------
 .../streams/assignor/StickyTaskAssignorTest.java   |  89 ------------------
 2 files changed, 7 insertions(+), 183 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
index b1f1d9b1a11..f455bb577eb 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -91,8 +90,6 @@ public class StickyTaskAssignor implements TaskAssignor {
         localState.totalCapacity = groupSpec.members().size();
         localState.tasksPerMember = computeTasksPerMember(localState.allTasks, 
localState.totalCapacity);
 
-        localState.taskPairs = new TaskPairs(localState.allTasks * 
(localState.allTasks - 1) / 2);
-
         localState.processIdToState = new HashMap<>();
         localState.activeTaskToPrevMember = new HashMap<>();
         localState.standbyTaskToPrevMember = new HashMap<>();
@@ -175,7 +172,7 @@ public class StickyTaskAssignor implements TaskAssignor {
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
                 
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, task, true);
+                updateHelpers(prevMember, true);
                 it.remove();
             }
         }
@@ -187,7 +184,7 @@ public class StickyTaskAssignor implements TaskAssignor {
             final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
                 
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, task, true);
+                updateHelpers(prevMember, true);
                 it.remove();
             }
         }
@@ -204,7 +201,7 @@ public class StickyTaskAssignor implements TaskAssignor {
             }
             
localState.processIdToState.get(member.processId).addTask(member.memberId, 
task, true);
             it.remove();
-            updateHelpers(member, task, true);
+            updateHelpers(member, true);
 
         }
     }
@@ -221,20 +218,10 @@ public class StickyTaskAssignor implements TaskAssignor {
         if (members == null || members.isEmpty()) {
             return null;
         }
-        Set<Member> rightPairs = members.stream()
-            .filter(member  -> localState.taskPairs.hasNewPair(taskId, 
localState.processIdToState.get(member.processId).assignedTasks()))
-            .collect(Collectors.toSet());
-        if (rightPairs.isEmpty()) {
-            rightPairs = members;
-        }
-        Optional<ProcessState> processWithLeastLoad = rightPairs.stream()
+        Optional<ProcessState> processWithLeastLoad = members.stream()
             .map(member  -> localState.processIdToState.get(member.processId))
             .min(Comparator.comparingDouble(ProcessState::load));
 
-        // processWithLeastLoad must be present at this point, but we do a 
double check
-        if (processWithLeastLoad.isEmpty()) {
-            return null;
-        }
         // if the same exact former member is needed
         if (returnSameMember) {
             return localState.standbyTaskToPrevMember.get(taskId).stream()
@@ -275,8 +262,7 @@ public class StickyTaskAssignor implements TaskAssignor {
 
                 // prev active task
                 Member prevMember = 
localState.activeTaskToPrevMember.get(task);
-                if (prevMember != null && 
availableProcesses.contains(prevMember.processId) && 
isLoadBalanced(prevMember.processId)
-                    && localState.taskPairs.hasNewPair(task, 
localState.processIdToState.get(prevMember.processId).assignedTasks())) {
+                if (prevMember != null && 
availableProcesses.contains(prevMember.processId) && 
isLoadBalanced(prevMember.processId)) {
                     standby = prevMember;
                 }
 
@@ -304,7 +290,7 @@ public class StickyTaskAssignor implements TaskAssignor {
                     }
                 }
                 
localState.processIdToState.get(standby.processId).addTask(standby.memberId, 
task, false);
-                updateHelpers(standby, task, false);
+                updateHelpers(standby, false);
             }
 
         }
@@ -323,10 +309,7 @@ public class StickyTaskAssignor implements TaskAssignor {
         return process.hasCapacity() || isLeastLoadedProcess;
     }
 
-    private void updateHelpers(final Member member, final TaskId taskId, final 
boolean isActive) {
-        // add all pair combinations: update taskPairs
-        localState.taskPairs.addPairs(taskId, 
localState.processIdToState.get(member.processId).assignedTasks());
-
+    private void updateHelpers(final Member member, final boolean isActive) {
         if (isActive) {
             // update task per process
             
maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).activeTaskCount());
@@ -344,75 +327,6 @@ public class StickyTaskAssignor implements TaskAssignor {
         return tasksPerMember;
     }
 
-    private static class TaskPairs {
-        private final Set<Pair> pairs;
-        private final int maxPairs;
-
-        TaskPairs(final int maxPairs) {
-            this.maxPairs = maxPairs;
-            this.pairs = new HashSet<>(maxPairs);
-        }
-
-        boolean hasNewPair(final TaskId task1,
-                           final Set<TaskId> taskIds) {
-            if (pairs.size() == maxPairs) {
-                return false;
-            }
-            if (taskIds.size() == 0) {
-                return true;
-            }
-            for (final TaskId taskId : taskIds) {
-                if (!pairs.contains(pair(task1, taskId))) {
-                    return true;
-                }
-            }
-            return false;
-        }
-
-        void addPairs(final TaskId taskId, final Set<TaskId> assigned) {
-            for (final TaskId id : assigned) {
-                if (!id.equals(taskId))
-                    pairs.add(pair(id, taskId));
-            }
-        }
-
-        Pair pair(final TaskId task1, final TaskId task2) {
-            if (task1.compareTo(task2) < 0) {
-                return new Pair(task1, task2);
-            }
-            return new Pair(task2, task1);
-        }
-
-
-        private static class Pair {
-            private final TaskId task1;
-            private final TaskId task2;
-
-            Pair(final TaskId task1, final TaskId task2) {
-                this.task1 = task1;
-                this.task2 = task2;
-            }
-
-            @Override
-            public boolean equals(final Object o) {
-                if (this == o) {
-                    return true;
-                }
-                if (o == null || getClass() != o.getClass()) {
-                    return false;
-                }
-                final Pair pair = (Pair) o;
-                return Objects.equals(task1, pair.task1) &&
-                    Objects.equals(task2, pair.task2);
-            }
-
-            @Override
-            public int hashCode() {
-                return Objects.hash(task1, task2);
-            }
-        }
-    }
-
     static class Member {
         private final String processId;
         private final String memberId;
@@ -425,7 +339,6 @@ public class StickyTaskAssignor implements TaskAssignor {
 
     private static class LocalState {
         // helper data structures:
-        private TaskPairs taskPairs;
         Map<TaskId, Member> activeTaskToPrevMember;
         Map<TaskId, Set<Member>> standbyTaskToPrevMember;
         Map<String, ProcessState> processIdToState;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
index c0493887266..ee6cc5e2ae9 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
@@ -32,12 +32,10 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -581,86 +579,6 @@ public class StickyTaskAssignorTest {
         assertEquals(4, getAllActiveTaskCount(result, "member1"));
     }
 
-    @Test
-    public void shouldNotHaveSameAssignmentOnAnyTwoHosts() {
-        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
-        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
-        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
-        final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
-        final List<String> allMemberIds = asList("member1", "member2", 
"member3", "member4");
-        Map<String, AssignmentMemberSpec> members = mkMap(
-            mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), 
mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4));
-
-        final GroupAssignment result = assignor.assign(
-            new GroupSpecImpl(members,
-                mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
-            new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
-        );
-
-        for (final String memberId : allMemberIds) {
-            final List<Integer> taskIds = getAllTaskIds(result, memberId);
-            for (final String otherMemberId : allMemberIds) {
-                if (!memberId.equals(otherMemberId)) {
-                    assertNotEquals(taskIds, getAllTaskIds(result, 
otherMemberId));
-                }
-            }
-        }
-    }
-
-    @Test
-    public void 
shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() {
-        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Sets.newSet(1, 2))), Map.of());
-        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Sets.newSet(3))), Map.of());
-        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
Sets.newSet(0))), Map.of());
-        final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
-        final List<String> allMemberIds = asList("member1", "member2", 
"member3", "member4");
-        Map<String, AssignmentMemberSpec> members = mkMap(
-            mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), 
mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4));
-
-        final GroupAssignment result = assignor.assign(
-            new GroupSpecImpl(members,
-                mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
-            new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
-        );
-
-        for (final String memberId : allMemberIds) {
-            final List<Integer> taskIds = getAllTaskIds(result, memberId);
-            for (final String otherMemberId : allMemberIds) {
-                if (!memberId.equals(otherMemberId)) {
-                    assertNotEquals(taskIds, getAllTaskIds(result, 
otherMemberId));
-                }
-            }
-        }
-    }
-
-    @Test
-    public void 
shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() {
-        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
-            mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2))), 
mkMap(mkEntry("test-subtopology", Sets.newSet(3, 0))));
-        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
-            mkMap(mkEntry("test-subtopology", Sets.newSet(3, 0))), 
mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2))));
-        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
-        final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
-        final List<String> allMemberIds = asList("member1", "member2", 
"member3", "member4");
-        Map<String, AssignmentMemberSpec> members = mkMap(
-            mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), 
mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4));
-
-        final GroupAssignment result = assignor.assign(
-            new GroupSpecImpl(members,
-                mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
-            new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
-        );
-
-        for (final String memberId : allMemberIds) {
-            final List<Integer> taskIds = getAllTaskIds(result, memberId);
-            for (final String otherMemberId : allMemberIds) {
-                if (!memberId.equals(otherMemberId)) {
-                    assertNotEquals(taskIds, getAllTaskIds(result, 
otherMemberId));
-                }
-            }
-        }
-    }
-
     @Test
     public void 
shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() {
         final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
Sets.newSet(0, 1, 2, 3))), Map.of());
@@ -1020,13 +938,6 @@ public class StickyTaskAssignorTest {
         return res;
     }
 
-    private List<Integer> getAllTaskIds(GroupAssignment result, String... 
memberIds) {
-        List<Integer> res = new ArrayList<>();
-        res.addAll(getAllActiveTaskIds(result, memberIds));
-        res.addAll(getAllStandbyTaskIds(result, memberIds));
-        return res;
-    }
-
     private Map<String, Set<Integer>> mergeAllStandbyTasks(GroupAssignment 
result, String... memberIds) {
         Map<String, Set<Integer>> res = new HashMap<>();
         for (String memberId : memberIds) {

Reply via email to