lucasbru commented on code in PR #20172:
URL: https://github.com/apache/kafka/pull/20172#discussion_r2219107453


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -115,7 +117,7 @@ private void initialize(final GroupSpec groupSpec, final 
TopologyDescriber topol
                 Set<Integer> partitionNoSet = entry.getValue();
                 for (int partitionNo : partitionNoSet) {
                     TaskId taskId = new TaskId(entry.getKey(), partitionNo);
-                    localState.standbyTaskToPrevMember.putIfAbsent(taskId, new 
HashSet<>());
+                    localState.standbyTaskToPrevMember.putIfAbsent(taskId, new 
ArrayList<>());

Review Comment:
   minor: no need to deduplicate here, so I'd rather just use arrays



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -171,38 +173,43 @@ private void assignActive(final Set<TaskId> activeTasks) {
             final TaskId task = it.next();
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());

Review Comment:
   Just inlining `updateHelpers`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -171,38 +173,43 @@ private void assignActive(final Set<TaskId> activeTasks) {
             final TaskId task = it.next();
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
-            final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+            final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
+            final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, 
null);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 3. assign any remaining unassigned tasks
+        PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
+        processByLoad.addAll(localState.processIdToState.values());
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> allMembers = 
localState.processIdToState.entrySet().stream().flatMap(entry -> 
entry.getValue().memberToTaskCounts().keySet().stream()
-                .map(memberId -> new Member(entry.getKey(), 
memberId))).collect(Collectors.toSet());
-            final Member member = findMemberWithLeastLoad(allMembers, task, 
false);
+            ProcessState processWithLeastLoad = processByLoad.poll();
+            if (processWithLeastLoad == null) {
+                throw new TaskAssignorException("No process available to 
assign active task {}." + task);
+            }
+            String member = memberWithLeastLoad(processWithLeastLoad);

Review Comment:
   `memberWithLeastLoad` still uses linear search within the process, as before.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -301,21 +348,13 @@ private String errorMessage(final int numStandbyReplicas, 
final int i, final Tas
             " of " + numStandbyReplicas + " standby tasks for task [" + task + 
"].";
     }
 
-    private boolean isLoadBalanced(final String processId) {
-        final ProcessState process = 
localState.processIdToState.get(processId);
+    private boolean isLoadBalanced(final ProcessState process) {

Review Comment:
   minor: passing the process in here saves us from looking it up in the 
hashmap again.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -171,38 +173,43 @@ private void assignActive(final Set<TaskId> activeTasks) {
             final TaskId task = it.next();
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
-            final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+            final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
+            final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, 
null);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 3. assign any remaining unassigned tasks
+        PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
+        processByLoad.addAll(localState.processIdToState.values());

Review Comment:
   Initial build of the priority queue by load



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int 
activeTasksNo) {
         }
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {
+        ProcessState processWithLeastLoad = queue.poll();
+        if (processWithLeastLoad == null) {
+            return false;
+        }
+        boolean found = false;
+        if (!processWithLeastLoad.hasTask(taskId)) {
+            String memberId = memberWithLeastLoad(processWithLeastLoad);
+            if (memberId != null) {
+                processWithLeastLoad.addTask(memberId, taskId, false);
+                found = true;
+            }
+        } else if (!queue.isEmpty()) {
+            found = assignStandbyToMemberWithLeastLoad(queue, taskId);

Review Comment:
   We are using recursion here. If the least loaded member already has the 
task, we recurse to find the next least loaded member. The point is that the 
least-loaded process is not added back to the queue at this point.
   
   Recursion is fine here, because we know that we get only `numStandbyTasks + 
1` recursions, since only `numStandbyTasks` processes can have the task 
already. By default, we only allow 2 standby replicas, so we'd get at most 3 
recursive calls here.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java:
##########
@@ -140,15 +140,19 @@ public void shouldWorkWithRebalance(
 
 
         final Properties props = new Properties();
+        final String appId = safeUniqueTestName(testInfo);
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
safeUniqueTestName(testInfo));
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
         props.put(InternalConfig.PROCESSING_THREADS_ENABLED, 
processingThreadsEnabled);
-        // decrease the session timeout so that we can trigger the rebalance 
soon after old client left closed
-        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
-        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500);
         if (streamsProtocolEnabled) {
             props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+            // decrease the session timeout so that we can trigger the 
rebalance soon after old client left closed
+            CLUSTER.setGroupSessionTimeout(appId, 10000);
+            CLUSTER.setGroupHeartbeatTimeout(appId, 1000);

Review Comment:
   The integration test set the session timeout and the heartbeat interval 
incorrectly before in the new protocol. We need to set it on the group level.
   
   This sometimes made the test flaky with the new assignment algorithm, since 
we iteratively cycle out the "oldest" member, and tend to assign the tasks from 
the next-oldest member. But, due to the high session timeout and heartbeat 
timeout, it could sometimes take too long for the new member to get the new 
tasks assigned, before being cycled out as well.
   
   I don't see a problem in the assignment logic here - actually, it seems 
useful to assign tasks to "old" members, since they are stable. We are just 
cycling out the tasks to quickly in this integration test.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java:
##########
@@ -401,6 +401,8 @@ public KafkaConsumer<byte[], byte[]> 
createConsumerAndSubscribeTo(final Map<Stri
 
     private void addDefaultBrokerPropsIfAbsent(final Properties brokerConfig) {
         
brokerConfig.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 
1024 * 1024L);
+        
brokerConfig.putIfAbsent(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
 "100");

Review Comment:
   For integration tests, we want to use rather extreme values so that they run 
quickly.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -171,38 +173,43 @@ private void assignActive(final Set<TaskId> activeTasks) {
             final TaskId task = it.next();
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
-            final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+            final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
+            final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, 
null);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 3. assign any remaining unassigned tasks
+        PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
+        processByLoad.addAll(localState.processIdToState.values());
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> allMembers = 
localState.processIdToState.entrySet().stream().flatMap(entry -> 
entry.getValue().memberToTaskCounts().keySet().stream()
-                .map(memberId -> new Member(entry.getKey(), 
memberId))).collect(Collectors.toSet());
-            final Member member = findMemberWithLeastLoad(allMembers, task, 
false);
+            ProcessState processWithLeastLoad = processByLoad.poll();

Review Comment:
   replace iteration to find least loaded member in `findMemberWithLeastLoad` 
by polling the priority queue.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java:
##########
@@ -99,7 +99,7 @@ private Properties streamsConfiguration(final boolean 
streamsProtocolEnabled) {
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
         if (streamsProtocolEnabled) {
             streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
-            CLUSTER.setStandbyReplicas("app-" + safeTestName, 1);
+            CLUSTER.setGroupStandbyReplicas("app-" + safeTestName, 1);

Review Comment:
   Just renamed



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -171,38 +173,43 @@ private void assignActive(final Set<TaskId> activeTasks) {
             final TaskId task = it.next();
             final Member prevMember = 
localState.activeTaskToPrevMember.get(task);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
-            final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+            final ArrayList<Member> prevMembers = 
localState.standbyTaskToPrevMember.get(task);
+            final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, 
null);
             if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
-                
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId,
 task, true);
-                updateHelpers(prevMember, true);
+                ProcessState processState = 
localState.processIdToState.get(prevMember.processId);
+                processState.addTask(prevMember.memberId, task, true);
+                maybeUpdateTasksPerMember(processState.activeTaskCount());
                 it.remove();
             }
         }
 
         // 3. assign any remaining unassigned tasks
+        PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
+        processByLoad.addAll(localState.processIdToState.values());
         for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
             final TaskId task = it.next();
-            final Set<Member> allMembers = 
localState.processIdToState.entrySet().stream().flatMap(entry -> 
entry.getValue().memberToTaskCounts().keySet().stream()
-                .map(memberId -> new Member(entry.getKey(), 
memberId))).collect(Collectors.toSet());
-            final Member member = findMemberWithLeastLoad(allMembers, task, 
false);
+            ProcessState processWithLeastLoad = processByLoad.poll();
+            if (processWithLeastLoad == null) {
+                throw new TaskAssignorException("No process available to 
assign active task {}." + task);
+            }
+            String member = memberWithLeastLoad(processWithLeastLoad);
             if (member == null) {
-                log.error("Unable to assign active task {} to any member.", 
task);
                 throw new TaskAssignorException("No member available to assign 
active task {}." + task);
             }
-            
localState.processIdToState.get(member.processId).addTask(member.memberId, 
task, true);
+            processWithLeastLoad.addTask(member, task, true);
             it.remove();
-            updateHelpers(member, true);
-
+            maybeUpdateTasksPerMember(processWithLeastLoad.activeTaskCount());
+            processByLoad.add(processWithLeastLoad); // Add it back to the 
queue after updating its state

Review Comment:
   After we have changed the load, we need to add it back to the priority 
queue, so that it is inserted at the correct position



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int 
activeTasksNo) {
         }
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {
+        ProcessState processWithLeastLoad = queue.poll();
+        if (processWithLeastLoad == null) {
+            return false;
+        }
+        boolean found = false;
+        if (!processWithLeastLoad.hasTask(taskId)) {
+            String memberId = memberWithLeastLoad(processWithLeastLoad);
+            if (memberId != null) {
+                processWithLeastLoad.addTask(memberId, taskId, false);
+                found = true;
+            }
+        } else if (!queue.isEmpty()) {
+            found = assignStandbyToMemberWithLeastLoad(queue, taskId);
+        }
+        queue.add(processWithLeastLoad); // Add it back to the queue after 
updating its state
+        return found;
+    }
+
+    /**
+     * Finds the previous member with the least load for a given task.
+     *
+     * @param members The list of previous members owning the task.
+     * @param taskId  The taskId, to check if the previous member already has 
the task. Can be null, if we assign it
+     *                for the first time (e.g., during active task assignment).
+     *
+     * @return Previous member with the least load that deoes not have the 
task, or null if no such member exists.
+     */
+    private Member findPrevMemberWithLeastLoad(final ArrayList<Member> 
members, final TaskId taskId) {
         if (members == null || members.isEmpty()) {
             return null;
         }
-        Optional<ProcessState> processWithLeastLoad = members.stream()
-            .map(member  -> localState.processIdToState.get(member.processId))
-            .min(Comparator.comparingDouble(ProcessState::load));
-
-        // if the same exact former member is needed
-        if (returnSameMember) {
-            return localState.standbyTaskToPrevMember.get(taskId).stream()
-                .filter(standby -> 
standby.processId.equals(processWithLeastLoad.get().processId()))
-                .findFirst()
-                .orElseGet(() -> 
memberWithLeastLoad(processWithLeastLoad.get()));
+
+        Member candidate = members.get(0);
+        ProcessState candidateProcessState = 
localState.processIdToState.get(candidate.processId);
+        double candidateProcessLoad = candidateProcessState.load();
+        double candidateMemberLoad = 
candidateProcessState.memberToTaskCounts().get(candidate.memberId);
+        for (int i = 1; i < members.size(); i++) {
+            Member member = members.get(i);
+            ProcessState processState = 
localState.processIdToState.get(member.processId);
+            double newProcessLoad = processState.load();
+            if (newProcessLoad < candidateProcessLoad && (taskId == null || 
!processState.hasTask(taskId))) {
+                double newMemberLoad = 
processState.memberToTaskCounts().get(member.memberId);
+                if (newMemberLoad < candidateMemberLoad) {
+                    candidateProcessLoad = newProcessLoad;
+                    candidateMemberLoad = newMemberLoad;
+                    candidate = member;
+                }
+            }
         }
-        return memberWithLeastLoad(processWithLeastLoad.get());
+
+        if (taskId == null || !candidateProcessState.hasTask(taskId)) {
+            return candidate;
+        }
+        return null;
     }
 
-    private Member memberWithLeastLoad(final ProcessState 
processWithLeastLoad) {
+    private String memberWithLeastLoad(final ProcessState 
processWithLeastLoad) {
+        Map<String, Integer> members = 
processWithLeastLoad.memberToTaskCounts();
+        if (members.isEmpty()) {
+            return null;
+        }
+        if (members.size() == 1) {
+            return members.keySet().iterator().next();
+        }
         Optional<String> memberWithLeastLoad = 
processWithLeastLoad.memberToTaskCounts().entrySet().stream()
             .min(Map.Entry.comparingByValue())
             .map(Map.Entry::getKey);
-        return memberWithLeastLoad.map(memberId -> new 
Member(processWithLeastLoad.processId(), memberId)).orElse(null);
+        return memberWithLeastLoad.orElse(null);
     }
 
     private boolean hasUnfulfilledQuota(final Member member) {
         return 
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < localState.tasksPerMember;
     }
 
     private void assignStandby(final Set<TaskId> standbyTasks, final int 
numStandbyReplicas) {
+        ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * numStandbyReplicas);

Review Comment:
   This arrayList is used to store all standby tasks that we couldn't assign to 
a member that previously owned that task, and needs to be assigned to the 
"least loaded" node.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -301,21 +348,13 @@ private String errorMessage(final int numStandbyReplicas, 
final int i, final Tas
             " of " + numStandbyReplicas + " standby tasks for task [" + task + 
"].";
     }
 
-    private boolean isLoadBalanced(final String processId) {
-        final ProcessState process = 
localState.processIdToState.get(processId);
+    private boolean isLoadBalanced(final ProcessState process) {
         final double load = process.load();
         boolean isLeastLoadedProcess = 
localState.processIdToState.values().stream()
             .allMatch(p -> p.load() >= load);
         return process.hasCapacity() || isLeastLoadedProcess;
     }
 
-    private void updateHelpers(final Member member, final boolean isActive) {

Review Comment:
   I just inlined this function



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int 
activeTasksNo) {
         }
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {
+        ProcessState processWithLeastLoad = queue.poll();
+        if (processWithLeastLoad == null) {
+            return false;
+        }
+        boolean found = false;
+        if (!processWithLeastLoad.hasTask(taskId)) {
+            String memberId = memberWithLeastLoad(processWithLeastLoad);
+            if (memberId != null) {
+                processWithLeastLoad.addTask(memberId, taskId, false);
+                found = true;
+            }
+        } else if (!queue.isEmpty()) {
+            found = assignStandbyToMemberWithLeastLoad(queue, taskId);
+        }
+        queue.add(processWithLeastLoad); // Add it back to the queue after 
updating its state
+        return found;
+    }
+
+    /**
+     * Finds the previous member with the least load for a given task.
+     *
+     * @param members The list of previous members owning the task.
+     * @param taskId  The taskId, to check if the previous member already has 
the task. Can be null, if we assign it
+     *                for the first time (e.g., during active task assignment).
+     *
+     * @return Previous member with the least load that deoes not have the 
task, or null if no such member exists.
+     */
+    private Member findPrevMemberWithLeastLoad(final ArrayList<Member> 
members, final TaskId taskId) {

Review Comment:
   `findPrevMemberWithLeastLoad` works very similar to the old 
`findMemberWithLeastLoad` - that is, it does a linear search among a collection 
of candidates.
   
   However, since we don't use it anymore to find the least loaded node among 
all members anymore - we use a priority queue there.
   
   This is only used to select the least loaded node among all members that 
previously owned the task. I replaced the Java Streams based iteration with a 
loop, since it's more efficient.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int 
activeTasksNo) {
         }
     }
 
-    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+    private boolean 
assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId 
taskId) {

Review Comment:
   This is the same as the active case assignment above. The difference is that 
there may be processes that already have the task, so consider more than one 
candidate process here (see recursion below).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to