YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862493447


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -526,165 +421,174 @@ private List<WorkerLoad> 
pickCandidateWorkerForReassignment(List<WorkerLoad> com
     }
 
     /**
-     * Task revocation is based on an rough estimation of the lower average 
number of tasks before
-     * and after new workers join the group. If no new workers join, no 
revocation takes place.
-     * Based on this estimation, tasks are revoked until the new floor average 
is reached for
-     * each existing worker. The revoked tasks, once assigned to the new 
workers will maintain
-     * a balanced load among the group.
-     *
-     * @param activeAssignments
-     * @param completeWorkerAssignment
-     * @return
+     * Revoke connectors and tasks from each worker in the cluster until no 
worker is running more than it would be if:
+     * <ul>
+     *     <li>The allocation of connectors and tasks across the cluster were 
as balanced as possible (i.e., the difference in allocation size between any 
two workers is at most one)</li>
+     *     <li>Any workers that left the group within the scheduled rebalance 
delay permanently left the group</li>
+     *     <li>All currently-configured connectors and tasks were allocated 
(including instances that may be revoked in this round because they are 
duplicated across workers)</li>
+     * </ul>
+     * @param configured the set of configured connectors and tasks across the 
entire cluster
+     * @param workers the workers in the cluster, whose assignments should not 
include any deleted or duplicated connectors or tasks
+     *                that are already due to be revoked from the worker in 
this rebalance
+     * @return which connectors and tasks should be revoked from which 
workers; never null, but may be empty
+     * if no load-balancing revocations are necessary or possible
      */
-    private Map<String, ConnectorsAndTasks> 
performTaskRevocation(ConnectorsAndTasks activeAssignments,
-                                                                  
Collection<WorkerLoad> completeWorkerAssignment) {
-        int totalActiveConnectorsNum = activeAssignments.connectors().size();
-        int totalActiveTasksNum = activeAssignments.tasks().size();
-        Collection<WorkerLoad> existingWorkers = 
completeWorkerAssignment.stream()
-                .filter(wl -> wl.size() > 0)
-                .collect(Collectors.toList());
-        int existingWorkersNum = existingWorkers.size();
-        int totalWorkersNum = completeWorkerAssignment.size();
-        int newWorkersNum = totalWorkersNum - existingWorkersNum;
-
-        if (log.isDebugEnabled()) {
-            completeWorkerAssignment.forEach(wl -> log.debug(
+    private Map<String, ConnectorsAndTasks> performLoadBalancingRevocations(
+            final ConnectorsAndTasks configured,
+            final Collection<WorkerLoad> workers
+    ) {
+        if (log.isTraceEnabled()) {
+            workers.forEach(wl -> log.trace(
                     "Per worker current load size; worker: {} connectors: {} 
tasks: {}",
                     wl.worker(), wl.connectorsSize(), wl.tasksSize()));
         }
 
-        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
-        // If there are no new workers, or no existing workers to revoke tasks 
from return early
-        // after logging the status
-        if (!(newWorkersNum > 0 && existingWorkersNum > 0)) {
-            log.debug("No task revocation required; workers with existing 
load: {} workers with "
-                    + "no load {} total workers {}",
-                    existingWorkersNum, newWorkersNum, totalWorkersNum);
-            // This is intentionally empty but mutable, because the map is 
used to include deleted
-            // connectors and tasks as well
-            return revoking;
+        if (workers.stream().allMatch(WorkerLoad::isEmpty)) {
+            log.trace("No load-balancing revocations required; all workers are 
either new "
+                    + "or will have all currently-assigned connectors and 
tasks revoked during this round"
+            );
+            return Collections.emptyMap();
+        }
+        if (configured.isEmpty()) {
+            log.trace("No load-balancing revocations required; no connectors 
are currently configured on this cluster");
+            return Collections.emptyMap();
         }
 
-        log.debug("Task revocation is required; workers with existing load: {} 
workers with "
-                + "no load {} total workers {}",
-                existingWorkersNum, newWorkersNum, totalWorkersNum);
-
-        // We have at least one worker assignment (the leader itself) so 
totalWorkersNum can't be 0
-        log.debug("Previous rounded down (floor) average number of connectors 
per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % 
totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down 
(floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
-
-        log.debug("Previous rounded down (floor) average number of tasks per 
worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum 
== 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) 
{} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
-
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && 
numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
+        final Map<String, ConnectorsAndTasks.Builder> result = new HashMap<>();
+
+        Map<String, Set<String>> connectorRevocations = 
loadBalancingRevocations(
+                "connector",
+                configured.connectors().size(),
+                workers,
+                WorkerLoad::connectors
+        );
+        Map<String, Set<ConnectorTaskId>> taskRevocations = 
loadBalancingRevocations(
+                "task",
+                configured.tasks().size(),
+                workers,
+                WorkerLoad::tasks
+        );
+
+        connectorRevocations.forEach((worker, revoked) ->
+            result.computeIfAbsent(worker, w -> ConnectorsAndTasks.builder())
+                    .addConnectors(revoked)
+        );
+        taskRevocations.forEach((worker, revoked) ->
+            result.computeIfAbsent(worker, w -> ConnectorsAndTasks.builder())
+                    .addTasks(revoked)
+        );
+
+        return buildAll(result);
+    }
+
+    private <E> Map<String, Set<E>> loadBalancingRevocations(
+            final String allocatedResourceName,
+            final int totalToAllocate,
+            final Collection<WorkerLoad> workers,
+            final Function<WorkerLoad, Collection<E>> workerAllocation
+    ) {
+        final int totalWorkers = workers.size();
+        // The minimum instances of this resource that should be assigned to 
each worker
+        final int minAllocatedPerWorker = totalToAllocate / totalWorkers;
+        // How many workers are going to have to be allocated exactly one 
extra instance
+        // (since the total number to allocate may not be a perfect multiple 
of the number of workers)
+        final int extrasToAllocate = totalToAllocate % totalWorkers;
+        // Useful function to determine exactly how many instances of the 
resource a given worker is currently allocated
+        final Function<WorkerLoad, Integer> workerAllocationSize = 
workerAllocation.andThen(Collection::size);
+
+        final long workersAllocatedMinimum = workers.stream()
+                .map(workerAllocationSize)
+                .filter(n -> n == minAllocatedPerWorker)
+                .count();
+        final long workersAllocatedSingleExtra = workers.stream()
+                .map(workerAllocationSize)
+                .filter(n -> n == minAllocatedPerWorker + 1)
+                .count();
+        if (workersAllocatedSingleExtra == extrasToAllocate
+                && workersAllocatedMinimum + workersAllocatedSingleExtra == 
totalWorkers) {
+            log.trace(
+                    "No load-balancing {} revocations required; the current 
allocations, when combined with any newly-created {}, should be balanced",
+                    allocatedResourceName,
+                    allocatedResourceName
+            );
+            return Collections.emptyMap();
         }
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
-            numToRevoke = existing.tasksSize() - ceilTasks;
-            log.debug("Tasks on worker {} is higher than ceiling, so revoking 
{} tasks", existing, numToRevoke);
-            for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 
0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.tasks().add(tasks.next());
+        final Map<String, Set<E>> result = new HashMap<>();
+        // How many workers we've allocated a single extra resource instance to
+        int allocatedExtras = 0;
+        for (WorkerLoad worker : workers) {
+            int currentAllocationSizeForWorker = 
workerAllocationSize.apply(worker);
+            if (currentAllocationSizeForWorker <= minAllocatedPerWorker) {
+                // This worker isn't allocated more than the minimum; no need 
to revoke anything
+                continue;
+            }
+            int maxAllocationForWorker;
+            if (allocatedExtras < extrasToAllocate) {
+                // We'll allocate one of the extra resource instances to this 
worker
+                allocatedExtras++;
+                if (currentAllocationSizeForWorker == minAllocatedPerWorker + 
1) {
+                    // If the worker's running exactly one more than the 
minimum, and we're allowed to
+                    // allocate an extra to it, there's no need to revoke 
anything
+                    continue;
+                }
+                maxAllocationForWorker = minAllocatedPerWorker + 1;
+            } else {
+                maxAllocationForWorker = minAllocatedPerWorker;
+            }
+
+            Set<E> revokedFromWorker = new LinkedHashSet<>();
+            result.put(worker.worker(), revokedFromWorker);
+
+            Iterator<E> currentWorkerAllocation = 
workerAllocation.apply(worker).iterator();
+            // Revoke resources from the worker until it isn't allocated any 
more than it should be
+            while (workerAllocationSize.apply(worker) > 
maxAllocationForWorker) {
+                if (!currentWorkerAllocation.hasNext()) {
+                    // Should never happen, but better to log a warning and 
move on than die and fail the whole rebalance if it does
+                    log.warn(
+                            "Unexpectedly ran out of {}s to revoke from worker 
{} while performing load-balancing revocations; " +
+                                    "worker appears to still be allocated {} 
instances, which is more than the intended allocation of {}",
+                            allocatedResourceName,
+                            worker.worker(),
+                            workerAllocationSize.apply(worker),
+                            maxAllocationForWorker
+                    );
+                    break;
+                }
+                E revocation = currentWorkerAllocation.next();
+                revokedFromWorker.add(revocation);
+                // Make sure to remove the resource from the worker load so 
that later operations
+                // (such as assigning newly-created connectors and tasks) can 
take that into account
+                currentWorkerAllocation.remove();
             }
         }
+        return result;
+    }
 
-        return revoking;
+    private int calculateDelay(long now) {

Review Comment:
   I could see using many final parameters in Kafka code. So this just for code 
convention and safety.
   
   ```suggestion
       private int calculateDelay(final long now) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to