YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862491251
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ########## @@ -723,44 +610,175 @@ protected void assignConnectors(List<WorkerLoad> workerAssignment, Collection<St * @param workerAssignment the current worker assignment; assigned tasks are added to this list * @param tasks the tasks to be assigned */ - protected void assignTasks(List<WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks) { - workerAssignment.sort(WorkerLoad.taskComparator()); - WorkerLoad first = workerAssignment.get(0); + // Visible for testing + void assignTasks(List<WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks) { + assign(workerAssignment, tasks, WorkerLoad::tasks, WorkerLoad::assign); + } - Iterator<ConnectorTaskId> load = tasks.iterator(); + private <E> void assign( + List<WorkerLoad> workers, + Collection<E> toAssign, + Function<WorkerLoad, Collection<E>> currentAllocation, + BiConsumer<WorkerLoad, E> assignToWorker + ) { + Function<WorkerLoad, Integer> allocationSize = currentAllocation.andThen(Collection::size); + workers.sort(Comparator.comparing(allocationSize)); + WorkerLoad first = workers.get(0); + + Iterator<E> load = toAssign.stream().sorted().iterator(); while (load.hasNext()) { - int firstLoad = first.tasksSize(); - int upTo = IntStream.range(0, workerAssignment.size()) - .filter(i -> workerAssignment.get(i).tasksSize() > firstLoad) + int firstLoad = allocationSize.apply(first); + int upTo = IntStream.range(0, workers.size()) Review Comment: This is minor suggestion and could be ignored. If calculate workers.size in while loop it has to be calculated all the time until it is true. What about calculating size one time then use it many times. ``` int workersSize = workers.size(); IntStream.range(0, workersSize) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org