rhauch commented on a change in pull request #9319: URL: https://github.com/apache/kafka/pull/9319#discussion_r496296182
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ########## @@ -260,7 +259,7 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { // Do not revoke resources for re-assignment while a delayed rebalance is active // Also we do not revoke in two consecutive rebalances by the same leader canRevoke = delay == 0 && canRevoke; - + log.debug("Connector and task to revoke assgn post lb calculation: {}", toRevoke); Review comment: Can you please make this log message more readable? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ########## @@ -577,15 +596,14 @@ private void resetDelay() { numToRevoke = floorTasks; for (WorkerLoad existing : existingWorkers) { Iterator<ConnectorTaskId> tasks = existing.tasks().iterator(); + numToRevoke = existing.tasksSize() - ceilTasks; Review comment: Isn't it possible that `numToRevoke` might be negative? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ########## @@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, if (scheduledRebalance > 0 && now >= scheduledRebalance) { // delayed rebalance expired and it's time to assign resources log.debug("Delayed rebalance expired. Reassigning lost tasks"); - Optional<WorkerLoad> candidateWorkerLoad = Optional.empty(); + List<WorkerLoad> candidateWorkerLoad = Collections.emptyList(); if (!candidateWorkersForReassignment.isEmpty()) { candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment); } - if (candidateWorkerLoad.isPresent()) { - WorkerLoad workerLoad = candidateWorkerLoad.get(); - log.debug("A candidate worker has been found to assign lost tasks: {}", workerLoad.worker()); - lostAssignments.connectors().forEach(workerLoad::assign); - lostAssignments.tasks().forEach(workerLoad::assign); + if (!candidateWorkerLoad.isEmpty()) { + log.debug("A list of candidate workers has been found to assign lost tasks: {}", candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(","))); + Iterator<WorkerLoad> candidateWorkerIterator = candidateWorkerLoad.iterator(); + for (String connector : lostAssignments.connectors()) { + // Loop over the the candidate workers as many times as it takes + if (!candidateWorkerIterator.hasNext()) { + candidateWorkerIterator = candidateWorkerLoad.iterator(); + } + WorkerLoad worker = candidateWorkerIterator.next(); + log.debug("Assigning connector id {} to member {}", connector, worker.worker()); + worker.assign(connector); + log.debug("Assigned connector id {} to member {}", connector, worker.worker()); + } + candidateWorkerIterator = candidateWorkerLoad.iterator(); + for (ConnectorTaskId task : lostAssignments.tasks()) { + if (!candidateWorkerIterator.hasNext()) { + candidateWorkerIterator = candidateWorkerLoad.iterator(); + } + WorkerLoad worker = candidateWorkerIterator.next(); + log.debug("Assigning task id {} to member {}", task, worker.worker()); + worker.assign(task); + log.debug("Assigned task id {} to member {}", task, worker.worker()); Review comment: ```suggestion ``` Do we need both of these debug messages? After all, `worker.assign(...)` is just adding a `ConnectorTaskId` to a collection. How about keeping the first one since this is at this point an on-going process and we've not actually assigned anything to the actual worker node. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ########## @@ -559,6 +576,8 @@ private void resetDelay() { log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum); int floorTasks = totalActiveTasksNum / totalWorkersNum; log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks); + int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / totalWorkersNum); + log.debug("New rounded down (ceil) average number of tasks per worker {}", ceilTasks); Review comment: What do you think about combining these log messages? ```suggestion int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / totalWorkersNum); log.debug("New average number of tasks per worker: floor={}, ceiling={}", floorTasks, ceilTasks); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ########## @@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, if (scheduledRebalance > 0 && now >= scheduledRebalance) { // delayed rebalance expired and it's time to assign resources log.debug("Delayed rebalance expired. Reassigning lost tasks"); - Optional<WorkerLoad> candidateWorkerLoad = Optional.empty(); + List<WorkerLoad> candidateWorkerLoad = Collections.emptyList(); if (!candidateWorkersForReassignment.isEmpty()) { candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment); } - if (candidateWorkerLoad.isPresent()) { - WorkerLoad workerLoad = candidateWorkerLoad.get(); - log.debug("A candidate worker has been found to assign lost tasks: {}", workerLoad.worker()); - lostAssignments.connectors().forEach(workerLoad::assign); - lostAssignments.tasks().forEach(workerLoad::assign); + if (!candidateWorkerLoad.isEmpty()) { + log.debug("A list of candidate workers has been found to assign lost tasks: {}", candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(","))); + Iterator<WorkerLoad> candidateWorkerIterator = candidateWorkerLoad.iterator(); + for (String connector : lostAssignments.connectors()) { + // Loop over the the candidate workers as many times as it takes + if (!candidateWorkerIterator.hasNext()) { + candidateWorkerIterator = candidateWorkerLoad.iterator(); + } + WorkerLoad worker = candidateWorkerIterator.next(); + log.debug("Assigning connector id {} to member {}", connector, worker.worker()); + worker.assign(connector); + log.debug("Assigned connector id {} to member {}", connector, worker.worker()); Review comment: ```suggestion ``` Do we need both of these debug messages? After all, `worker.assign(...)` is just adding a string to a collection. How about keeping the first one since this is at this point an on-going process and we've not actually assigned anything to the actual worker node. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ########## @@ -577,15 +596,14 @@ private void resetDelay() { numToRevoke = floorTasks; for (WorkerLoad existing : existingWorkers) { Iterator<ConnectorTaskId> tasks = existing.tasks().iterator(); + numToRevoke = existing.tasksSize() - ceilTasks; + log.debug("revoke number of tasks per worker {}", numToRevoke); Review comment: ```suggestion log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ########## @@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, if (scheduledRebalance > 0 && now >= scheduledRebalance) { // delayed rebalance expired and it's time to assign resources log.debug("Delayed rebalance expired. Reassigning lost tasks"); - Optional<WorkerLoad> candidateWorkerLoad = Optional.empty(); + List<WorkerLoad> candidateWorkerLoad = Collections.emptyList(); if (!candidateWorkersForReassignment.isEmpty()) { candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment); } - if (candidateWorkerLoad.isPresent()) { - WorkerLoad workerLoad = candidateWorkerLoad.get(); - log.debug("A candidate worker has been found to assign lost tasks: {}", workerLoad.worker()); - lostAssignments.connectors().forEach(workerLoad::assign); - lostAssignments.tasks().forEach(workerLoad::assign); + if (!candidateWorkerLoad.isEmpty()) { + log.debug("A list of candidate workers has been found to assign lost tasks: {}", candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(","))); Review comment: ```suggestion log.debug("Assigning lost tasks to {} candidate workers: {}", candidateWorkerLoad.size(), candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(","))); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org