YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862493447
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ########## @@ -526,165 +421,174 @@ private List<WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerLoad> com } /** - * Task revocation is based on an rough estimation of the lower average number of tasks before - * and after new workers join the group. If no new workers join, no revocation takes place. - * Based on this estimation, tasks are revoked until the new floor average is reached for - * each existing worker. The revoked tasks, once assigned to the new workers will maintain - * a balanced load among the group. - * - * @param activeAssignments - * @param completeWorkerAssignment - * @return + * Revoke connectors and tasks from each worker in the cluster until no worker is running more than it would be if: + * <ul> + * <li>The allocation of connectors and tasks across the cluster were as balanced as possible (i.e., the difference in allocation size between any two workers is at most one)</li> + * <li>Any workers that left the group within the scheduled rebalance delay permanently left the group</li> + * <li>All currently-configured connectors and tasks were allocated (including instances that may be revoked in this round because they are duplicated across workers)</li> + * </ul> + * @param configured the set of configured connectors and tasks across the entire cluster + * @param workers the workers in the cluster, whose assignments should not include any deleted or duplicated connectors or tasks + * that are already due to be revoked from the worker in this rebalance + * @return which connectors and tasks should be revoked from which workers; never null, but may be empty + * if no load-balancing revocations are necessary or possible */ - private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks activeAssignments, - Collection<WorkerLoad> completeWorkerAssignment) { - int totalActiveConnectorsNum = activeAssignments.connectors().size(); - int totalActiveTasksNum = activeAssignments.tasks().size(); - Collection<WorkerLoad> existingWorkers = completeWorkerAssignment.stream() - .filter(wl -> wl.size() > 0) - .collect(Collectors.toList()); - int existingWorkersNum = existingWorkers.size(); - int totalWorkersNum = completeWorkerAssignment.size(); - int newWorkersNum = totalWorkersNum - existingWorkersNum; - - if (log.isDebugEnabled()) { - completeWorkerAssignment.forEach(wl -> log.debug( + private Map<String, ConnectorsAndTasks> performLoadBalancingRevocations( + final ConnectorsAndTasks configured, + final Collection<WorkerLoad> workers + ) { + if (log.isTraceEnabled()) { + workers.forEach(wl -> log.trace( "Per worker current load size; worker: {} connectors: {} tasks: {}", wl.worker(), wl.connectorsSize(), wl.tasksSize())); } - Map<String, ConnectorsAndTasks> revoking = new HashMap<>(); - // If there are no new workers, or no existing workers to revoke tasks from return early - // after logging the status - if (!(newWorkersNum > 0 && existingWorkersNum > 0)) { - log.debug("No task revocation required; workers with existing load: {} workers with " - + "no load {} total workers {}", - existingWorkersNum, newWorkersNum, totalWorkersNum); - // This is intentionally empty but mutable, because the map is used to include deleted - // connectors and tasks as well - return revoking; + if (workers.stream().allMatch(WorkerLoad::isEmpty)) { + log.trace("No load-balancing revocations required; all workers are either new " + + "or will have all currently-assigned connectors and tasks revoked during this round" + ); + return Collections.emptyMap(); + } + if (configured.isEmpty()) { + log.trace("No load-balancing revocations required; no connectors are currently configured on this cluster"); + return Collections.emptyMap(); } - log.debug("Task revocation is required; workers with existing load: {} workers with " - + "no load {} total workers {}", - existingWorkersNum, newWorkersNum, totalWorkersNum); - - // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0 - log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum); - int floorConnectors = totalActiveConnectorsNum / totalWorkersNum; - int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1); - log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors); - - - log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum); - int floorTasks = totalActiveTasksNum / totalWorkersNum; - int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1); - log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks); - int numToRevoke; - - for (WorkerLoad existing : existingWorkers) { - Iterator<String> connectors = existing.connectors().iterator(); - numToRevoke = existing.connectorsSize() - ceilConnectors; - for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) { - ConnectorsAndTasks resources = revoking.computeIfAbsent( - existing.worker(), - w -> new ConnectorsAndTasks.Builder().build()); - resources.connectors().add(connectors.next()); - } + final Map<String, ConnectorsAndTasks.Builder> result = new HashMap<>(); + + Map<String, Set<String>> connectorRevocations = loadBalancingRevocations( + "connector", + configured.connectors().size(), + workers, + WorkerLoad::connectors + ); + Map<String, Set<ConnectorTaskId>> taskRevocations = loadBalancingRevocations( + "task", + configured.tasks().size(), + workers, + WorkerLoad::tasks + ); + + connectorRevocations.forEach((worker, revoked) -> + result.computeIfAbsent(worker, w -> ConnectorsAndTasks.builder()) + .addConnectors(revoked) + ); + taskRevocations.forEach((worker, revoked) -> + result.computeIfAbsent(worker, w -> ConnectorsAndTasks.builder()) + .addTasks(revoked) + ); + + return buildAll(result); + } + + private <E> Map<String, Set<E>> loadBalancingRevocations( + final String allocatedResourceName, + final int totalToAllocate, + final Collection<WorkerLoad> workers, + final Function<WorkerLoad, Collection<E>> workerAllocation + ) { + final int totalWorkers = workers.size(); + // The minimum instances of this resource that should be assigned to each worker + final int minAllocatedPerWorker = totalToAllocate / totalWorkers; + // How many workers are going to have to be allocated exactly one extra instance + // (since the total number to allocate may not be a perfect multiple of the number of workers) + final int extrasToAllocate = totalToAllocate % totalWorkers; + // Useful function to determine exactly how many instances of the resource a given worker is currently allocated + final Function<WorkerLoad, Integer> workerAllocationSize = workerAllocation.andThen(Collection::size); + + final long workersAllocatedMinimum = workers.stream() + .map(workerAllocationSize) + .filter(n -> n == minAllocatedPerWorker) + .count(); + final long workersAllocatedSingleExtra = workers.stream() + .map(workerAllocationSize) + .filter(n -> n == minAllocatedPerWorker + 1) + .count(); + if (workersAllocatedSingleExtra == extrasToAllocate + && workersAllocatedMinimum + workersAllocatedSingleExtra == totalWorkers) { + log.trace( + "No load-balancing {} revocations required; the current allocations, when combined with any newly-created {}, should be balanced", + allocatedResourceName, + allocatedResourceName + ); + return Collections.emptyMap(); } - for (WorkerLoad existing : existingWorkers) { - Iterator<ConnectorTaskId> tasks = existing.tasks().iterator(); - numToRevoke = existing.tasksSize() - ceilTasks; - log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke); - for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) { - ConnectorsAndTasks resources = revoking.computeIfAbsent( - existing.worker(), - w -> new ConnectorsAndTasks.Builder().build()); - resources.tasks().add(tasks.next()); + final Map<String, Set<E>> result = new HashMap<>(); + // How many workers we've allocated a single extra resource instance to + int allocatedExtras = 0; + for (WorkerLoad worker : workers) { + int currentAllocationSizeForWorker = workerAllocationSize.apply(worker); + if (currentAllocationSizeForWorker <= minAllocatedPerWorker) { + // This worker isn't allocated more than the minimum; no need to revoke anything + continue; + } + int maxAllocationForWorker; + if (allocatedExtras < extrasToAllocate) { + // We'll allocate one of the extra resource instances to this worker + allocatedExtras++; + if (currentAllocationSizeForWorker == minAllocatedPerWorker + 1) { + // If the worker's running exactly one more than the minimum, and we're allowed to + // allocate an extra to it, there's no need to revoke anything + continue; + } + maxAllocationForWorker = minAllocatedPerWorker + 1; + } else { + maxAllocationForWorker = minAllocatedPerWorker; + } + + Set<E> revokedFromWorker = new LinkedHashSet<>(); + result.put(worker.worker(), revokedFromWorker); + + Iterator<E> currentWorkerAllocation = workerAllocation.apply(worker).iterator(); + // Revoke resources from the worker until it isn't allocated any more than it should be + while (workerAllocationSize.apply(worker) > maxAllocationForWorker) { + if (!currentWorkerAllocation.hasNext()) { + // Should never happen, but better to log a warning and move on than die and fail the whole rebalance if it does + log.warn( + "Unexpectedly ran out of {}s to revoke from worker {} while performing load-balancing revocations; " + + "worker appears to still be allocated {} instances, which is more than the intended allocation of {}", + allocatedResourceName, + worker.worker(), + workerAllocationSize.apply(worker), + maxAllocationForWorker + ); + break; + } + E revocation = currentWorkerAllocation.next(); + revokedFromWorker.add(revocation); + // Make sure to remove the resource from the worker load so that later operations + // (such as assigning newly-created connectors and tasks) can take that into account + currentWorkerAllocation.remove(); } } + return result; + } - return revoking; + private int calculateDelay(long now) { Review Comment: I could see using many final parameters in Kafka code. So this just for code convention and safety. ```suggestion private int calculateDelay(final long now) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org