Copilot commented on code in PR #20172: URL: https://github.com/apache/kafka/pull/20172#discussion_r2219167350
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java: ########## @@ -171,38 +173,43 @@ private void assignActive(final Set<TaskId> activeTasks) { final TaskId task = it.next(); final Member prevMember = localState.activeTaskToPrevMember.get(task); if (prevMember != null && hasUnfulfilledQuota(prevMember)) { - localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); - updateHelpers(prevMember, true); + ProcessState processState = localState.processIdToState.get(prevMember.processId); + processState.addTask(prevMember.memberId, task, true); + maybeUpdateTasksPerMember(processState.activeTaskCount()); it.remove(); } } // 2. re-assigning tasks to clients that previously have seen the same task (as standby task) for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) { final TaskId task = it.next(); - final Set<Member> prevMembers = localState.standbyTaskToPrevMember.get(task); - final Member prevMember = findMemberWithLeastLoad(prevMembers, task, true); + final ArrayList<Member> prevMembers = localState.standbyTaskToPrevMember.get(task); + final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, null); if (prevMember != null && hasUnfulfilledQuota(prevMember)) { - localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); - updateHelpers(prevMember, true); + ProcessState processState = localState.processIdToState.get(prevMember.processId); + processState.addTask(prevMember.memberId, task, true); + maybeUpdateTasksPerMember(processState.activeTaskCount()); it.remove(); } } // 3. assign any remaining unassigned tasks + PriorityQueue<ProcessState> processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load)); + processByLoad.addAll(localState.processIdToState.values()); for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) { final TaskId task = it.next(); - final Set<Member> allMembers = localState.processIdToState.entrySet().stream().flatMap(entry -> entry.getValue().memberToTaskCounts().keySet().stream() - .map(memberId -> new Member(entry.getKey(), memberId))).collect(Collectors.toSet()); - final Member member = findMemberWithLeastLoad(allMembers, task, false); + ProcessState processWithLeastLoad = processByLoad.poll(); + if (processWithLeastLoad == null) { + throw new TaskAssignorException("No process available to assign active task {}." + task); + } + String member = memberWithLeastLoad(processWithLeastLoad); if (member == null) { - log.error("Unable to assign active task {} to any member.", task); throw new TaskAssignorException("No member available to assign active task {}." + task); Review Comment: The error message format is incorrect. The placeholder '{}' is not being used properly with string concatenation. Should be either 'No member available to assign active task ' + task + '.' or use proper string formatting. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java: ########## @@ -171,38 +173,43 @@ private void assignActive(final Set<TaskId> activeTasks) { final TaskId task = it.next(); final Member prevMember = localState.activeTaskToPrevMember.get(task); if (prevMember != null && hasUnfulfilledQuota(prevMember)) { - localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); - updateHelpers(prevMember, true); + ProcessState processState = localState.processIdToState.get(prevMember.processId); + processState.addTask(prevMember.memberId, task, true); + maybeUpdateTasksPerMember(processState.activeTaskCount()); it.remove(); } } // 2. re-assigning tasks to clients that previously have seen the same task (as standby task) for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) { final TaskId task = it.next(); - final Set<Member> prevMembers = localState.standbyTaskToPrevMember.get(task); - final Member prevMember = findMemberWithLeastLoad(prevMembers, task, true); + final ArrayList<Member> prevMembers = localState.standbyTaskToPrevMember.get(task); + final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, null); if (prevMember != null && hasUnfulfilledQuota(prevMember)) { - localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); - updateHelpers(prevMember, true); + ProcessState processState = localState.processIdToState.get(prevMember.processId); + processState.addTask(prevMember.memberId, task, true); + maybeUpdateTasksPerMember(processState.activeTaskCount()); it.remove(); } } // 3. assign any remaining unassigned tasks + PriorityQueue<ProcessState> processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load)); + processByLoad.addAll(localState.processIdToState.values()); for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) { final TaskId task = it.next(); - final Set<Member> allMembers = localState.processIdToState.entrySet().stream().flatMap(entry -> entry.getValue().memberToTaskCounts().keySet().stream() - .map(memberId -> new Member(entry.getKey(), memberId))).collect(Collectors.toSet()); - final Member member = findMemberWithLeastLoad(allMembers, task, false); + ProcessState processWithLeastLoad = processByLoad.poll(); + if (processWithLeastLoad == null) { + throw new TaskAssignorException("No process available to assign active task {}." + task); + } + String member = memberWithLeastLoad(processWithLeastLoad); if (member == null) { - log.error("Unable to assign active task {} to any member.", task); throw new TaskAssignorException("No member available to assign active task {}." + task); Review Comment: The error message format is incorrect. The placeholder '{}' is not being used properly with string concatenation. Should be either 'No process available to assign active task ' + task + '.' or use proper string formatting. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java: ########## @@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int activeTasksNo) { } } - private Member findMemberWithLeastLoad(final Set<Member> members, TaskId taskId, final boolean returnSameMember) { + private boolean assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId taskId) { + ProcessState processWithLeastLoad = queue.poll(); + if (processWithLeastLoad == null) { + return false; + } + boolean found = false; + if (!processWithLeastLoad.hasTask(taskId)) { + String memberId = memberWithLeastLoad(processWithLeastLoad); + if (memberId != null) { + processWithLeastLoad.addTask(memberId, taskId, false); + found = true; + } + } else if (!queue.isEmpty()) { + found = assignStandbyToMemberWithLeastLoad(queue, taskId); + } + queue.add(processWithLeastLoad); // Add it back to the queue after updating its state + return found; + } + + /** + * Finds the previous member with the least load for a given task. + * + * @param members The list of previous members owning the task. + * @param taskId The taskId, to check if the previous member already has the task. Can be null, if we assign it + * for the first time (e.g., during active task assignment). + * + * @return Previous member with the least load that deoes not have the task, or null if no such member exists. Review Comment: Spelling error: 'deoes' should be 'does'. ```suggestion * @return Previous member with the least load that does not have the task, or null if no such member exists. ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java: ########## @@ -214,85 +221,125 @@ private void maybeUpdateTasksPerMember(final int activeTasksNo) { } } - private Member findMemberWithLeastLoad(final Set<Member> members, TaskId taskId, final boolean returnSameMember) { + private boolean assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> queue, TaskId taskId) { + ProcessState processWithLeastLoad = queue.poll(); + if (processWithLeastLoad == null) { + return false; + } + boolean found = false; + if (!processWithLeastLoad.hasTask(taskId)) { + String memberId = memberWithLeastLoad(processWithLeastLoad); + if (memberId != null) { + processWithLeastLoad.addTask(memberId, taskId, false); + found = true; + } + } else if (!queue.isEmpty()) { + found = assignStandbyToMemberWithLeastLoad(queue, taskId); + } + queue.add(processWithLeastLoad); // Add it back to the queue after updating its state + return found; + } + + /** + * Finds the previous member with the least load for a given task. + * + * @param members The list of previous members owning the task. + * @param taskId The taskId, to check if the previous member already has the task. Can be null, if we assign it + * for the first time (e.g., during active task assignment). + * + * @return Previous member with the least load that deoes not have the task, or null if no such member exists. + */ + private Member findPrevMemberWithLeastLoad(final ArrayList<Member> members, final TaskId taskId) { if (members == null || members.isEmpty()) { return null; } - Optional<ProcessState> processWithLeastLoad = members.stream() - .map(member -> localState.processIdToState.get(member.processId)) - .min(Comparator.comparingDouble(ProcessState::load)); - - // if the same exact former member is needed - if (returnSameMember) { - return localState.standbyTaskToPrevMember.get(taskId).stream() - .filter(standby -> standby.processId.equals(processWithLeastLoad.get().processId())) - .findFirst() - .orElseGet(() -> memberWithLeastLoad(processWithLeastLoad.get())); + + Member candidate = members.get(0); + ProcessState candidateProcessState = localState.processIdToState.get(candidate.processId); + double candidateProcessLoad = candidateProcessState.load(); + double candidateMemberLoad = candidateProcessState.memberToTaskCounts().get(candidate.memberId); + for (int i = 1; i < members.size(); i++) { + Member member = members.get(i); + ProcessState processState = localState.processIdToState.get(member.processId); + double newProcessLoad = processState.load(); + if (newProcessLoad < candidateProcessLoad && (taskId == null || !processState.hasTask(taskId))) { + double newMemberLoad = processState.memberToTaskCounts().get(member.memberId); + if (newMemberLoad < candidateMemberLoad) { Review Comment: The logic for finding the member with least load is incorrect. The condition should check if the member can be used (doesn't have the task) before comparing loads, and the member load comparison should happen regardless of process load comparison. ```suggestion if (taskId == null || !processState.hasTask(taskId)) { double newMemberLoad = processState.memberToTaskCounts().get(member.memberId); if (newMemberLoad < candidateMemberLoad || (newMemberLoad == candidateMemberLoad && newProcessLoad < candidateProcessLoad)) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org