rhauch commented on a change in pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#discussion_r496296182



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -260,7 +259,7 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
         // Do not revoke resources for re-assignment while a delayed rebalance 
is active
         // Also we do not revoke in two consecutive rebalances by the same 
leader
         canRevoke = delay == 0 && canRevoke;
-
+        log.debug("Connector and task to revoke assgn post lb calculation: 
{}", toRevoke);

Review comment:
       Can you please make this log message more readable?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -577,15 +596,14 @@ private void resetDelay() {
         numToRevoke = floorTasks;
         for (WorkerLoad existing : existingWorkers) {
             Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
+            numToRevoke = existing.tasksSize() - ceilTasks;

Review comment:
       Isn't it possible that `numToRevoke` might be negative?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
         if (scheduledRebalance > 0 && now >= scheduledRebalance) {
             // delayed rebalance expired and it's time to assign resources
             log.debug("Delayed rebalance expired. Reassigning lost tasks");
-            Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
+            List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
             if (!candidateWorkersForReassignment.isEmpty()) {
                 candidateWorkerLoad = 
pickCandidateWorkerForReassignment(completeWorkerAssignment);
             }
 
-            if (candidateWorkerLoad.isPresent()) {
-                WorkerLoad workerLoad = candidateWorkerLoad.get();
-                log.debug("A candidate worker has been found to assign lost 
tasks: {}", workerLoad.worker());
-                lostAssignments.connectors().forEach(workerLoad::assign);
-                lostAssignments.tasks().forEach(workerLoad::assign);
+            if (!candidateWorkerLoad.isEmpty()) {
+                log.debug("A list of candidate workers has been found to 
assign lost tasks: {}", 
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
+                Iterator<WorkerLoad> candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+                for (String connector : lostAssignments.connectors()) {
+                    // Loop over the the candidate workers as many times as it 
takes
+                    if (!candidateWorkerIterator.hasNext()) {
+                        candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+                    }
+                    WorkerLoad worker = candidateWorkerIterator.next();
+                    log.debug("Assigning connector id {} to member {}", 
connector, worker.worker());
+                    worker.assign(connector);
+                    log.debug("Assigned connector id {} to member {}", 
connector, worker.worker());
+                }
+                candidateWorkerIterator = candidateWorkerLoad.iterator();
+                for (ConnectorTaskId task : lostAssignments.tasks()) {
+                    if (!candidateWorkerIterator.hasNext()) {
+                        candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+                    }
+                    WorkerLoad worker = candidateWorkerIterator.next();
+                    log.debug("Assigning task id {} to member {}", task, 
worker.worker());
+                    worker.assign(task);
+                    log.debug("Assigned task id {} to member {}", task, 
worker.worker());

Review comment:
       ```suggestion
   ```
   Do we need both of these debug messages? After all, `worker.assign(...)` is 
just adding a `ConnectorTaskId` to a collection. How about keeping the first 
one since this is at this point an on-going process and we've not actually 
assigned anything to the actual worker node.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -559,6 +576,8 @@ private void resetDelay() {
         log.debug("Previous rounded down (floor) average number of tasks per 
worker {}", totalActiveTasksNum / existingWorkersNum);
         int floorTasks = totalActiveTasksNum / totalWorkersNum;
         log.debug("New rounded down (floor) average number of tasks per worker 
{}", floorTasks);
+        int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / 
totalWorkersNum);
+        log.debug("New rounded down (ceil) average number of tasks per worker 
{}", ceilTasks);

Review comment:
       What do you think about combining these log messages?
   ```suggestion
           int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / 
totalWorkersNum);
           log.debug("New average number of tasks per worker: floor={}, 
ceiling={}", floorTasks, ceilTasks);
   ```
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
         if (scheduledRebalance > 0 && now >= scheduledRebalance) {
             // delayed rebalance expired and it's time to assign resources
             log.debug("Delayed rebalance expired. Reassigning lost tasks");
-            Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
+            List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
             if (!candidateWorkersForReassignment.isEmpty()) {
                 candidateWorkerLoad = 
pickCandidateWorkerForReassignment(completeWorkerAssignment);
             }
 
-            if (candidateWorkerLoad.isPresent()) {
-                WorkerLoad workerLoad = candidateWorkerLoad.get();
-                log.debug("A candidate worker has been found to assign lost 
tasks: {}", workerLoad.worker());
-                lostAssignments.connectors().forEach(workerLoad::assign);
-                lostAssignments.tasks().forEach(workerLoad::assign);
+            if (!candidateWorkerLoad.isEmpty()) {
+                log.debug("A list of candidate workers has been found to 
assign lost tasks: {}", 
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
+                Iterator<WorkerLoad> candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+                for (String connector : lostAssignments.connectors()) {
+                    // Loop over the the candidate workers as many times as it 
takes
+                    if (!candidateWorkerIterator.hasNext()) {
+                        candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+                    }
+                    WorkerLoad worker = candidateWorkerIterator.next();
+                    log.debug("Assigning connector id {} to member {}", 
connector, worker.worker());
+                    worker.assign(connector);
+                    log.debug("Assigned connector id {} to member {}", 
connector, worker.worker());

Review comment:
       ```suggestion
   ```
   Do we need both of these debug messages? After all, `worker.assign(...)` is 
just adding a string to a collection. How about keeping the first one since 
this is at this point an on-going process and we've not actually assigned 
anything to the actual worker node.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -577,15 +596,14 @@ private void resetDelay() {
         numToRevoke = floorTasks;
         for (WorkerLoad existing : existingWorkers) {
             Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
+            numToRevoke = existing.tasksSize() - ceilTasks;
+            log.debug("revoke number of tasks per worker {}", numToRevoke);

Review comment:
       ```suggestion
               log.debug("Tasks on worker {} is higher than ceiling, so 
revoking {} tasks", existing, numToRevoke);
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
         if (scheduledRebalance > 0 && now >= scheduledRebalance) {
             // delayed rebalance expired and it's time to assign resources
             log.debug("Delayed rebalance expired. Reassigning lost tasks");
-            Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
+            List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
             if (!candidateWorkersForReassignment.isEmpty()) {
                 candidateWorkerLoad = 
pickCandidateWorkerForReassignment(completeWorkerAssignment);
             }
 
-            if (candidateWorkerLoad.isPresent()) {
-                WorkerLoad workerLoad = candidateWorkerLoad.get();
-                log.debug("A candidate worker has been found to assign lost 
tasks: {}", workerLoad.worker());
-                lostAssignments.connectors().forEach(workerLoad::assign);
-                lostAssignments.tasks().forEach(workerLoad::assign);
+            if (!candidateWorkerLoad.isEmpty()) {
+                log.debug("A list of candidate workers has been found to 
assign lost tasks: {}", 
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));

Review comment:
       ```suggestion
                   log.debug("Assigning lost tasks to {} candidate workers: 
{}", 
                           candidateWorkerLoad.size(),
                           
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to