YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862491251


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -723,44 +610,175 @@ protected void assignConnectors(List<WorkerLoad> 
workerAssignment, Collection<St
      * @param workerAssignment the current worker assignment; assigned tasks 
are added to this list
      * @param tasks the tasks to be assigned
      */
-    protected void assignTasks(List<WorkerLoad> workerAssignment, 
Collection<ConnectorTaskId> tasks) {
-        workerAssignment.sort(WorkerLoad.taskComparator());
-        WorkerLoad first = workerAssignment.get(0);
+    // Visible for testing
+    void assignTasks(List<WorkerLoad> workerAssignment, 
Collection<ConnectorTaskId> tasks) {
+        assign(workerAssignment, tasks, WorkerLoad::tasks, WorkerLoad::assign);
+    }
 
-        Iterator<ConnectorTaskId> load = tasks.iterator();
+    private <E> void assign(
+            List<WorkerLoad> workers,
+            Collection<E> toAssign,
+            Function<WorkerLoad, Collection<E>> currentAllocation,
+            BiConsumer<WorkerLoad, E> assignToWorker
+    ) {
+        Function<WorkerLoad, Integer> allocationSize = 
currentAllocation.andThen(Collection::size);
+        workers.sort(Comparator.comparing(allocationSize));
+        WorkerLoad first = workers.get(0);
+
+        Iterator<E> load = toAssign.stream().sorted().iterator();
         while (load.hasNext()) {
-            int firstLoad = first.tasksSize();
-            int upTo = IntStream.range(0, workerAssignment.size())
-                    .filter(i -> workerAssignment.get(i).tasksSize() > 
firstLoad)
+            int firstLoad = allocationSize.apply(first);
+            int upTo = IntStream.range(0, workers.size())

Review Comment:
   This is minor suggestion and could be ignored. 
   If calculate `workers.size` in while loop it has to be calculated all the 
time while it is true.
   What about calculating size one time then use it many times.
   
   ```
   final int workersSize = workers.size();
   IntStream.range(0, workersSize)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to