C0urante commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r849010637


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
                 previousAssignment = activeAssignments;
                 canRevoke = true;
             }
-            previousRevocation.connectors().clear();
-            previousRevocation.tasks().clear();
+            previousRevocation = ConnectorsAndTasks.EMPTY;
         }
 
-        // Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-        // difference of previous - configured
-        ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-        log.debug("Deleted assignments: {}", deleted);
-
-        // Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-        // set difference of active - deleted
-        ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-        log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-        // Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-        // the set difference of previous - active - deleted
-        ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-        log.debug("Lost assignments: {}", lostAssignments);
-
-        // Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-        // difference of configured - previous - active
-        ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-        log.debug("New assignments: {}", newSubmissions);
+        // The connectors and tasks that have been deleted since the last 
rebalance
+        final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+        log.trace("Deleted assignments: {}", deleted);
 
-        // A collection of the complete assignment
-        List<WorkerLoad> completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-        log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+        // The connectors and tasks that are currently running on more than 
one worker each
+        final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+        log.trace("Duplicated assignments: {}", duplicated);
 
-        // Per worker connector assignments without removing deleted 
connectors yet
-        Map<String, Collection<String>> connectorAssignments =
-                
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-        log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+        // The connectors and tasks that should already be running on the 
cluster, but which are not included
+        // in the assignment reported by any workers in the cluster
+        final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+        log.trace("Lost assignments: {}", lostAssignments);
 
-        // Per worker task assignments without removing deleted connectors yet
-        Map<String, Collection<ConnectorTaskId>> taskAssignments =
-                
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-        log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+        // The connectors and tasks that have been created since the last 
rebalance
+        final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+        log.trace("New assignments: {}", created);
 
-        // A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-        List<WorkerLoad> currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+        final Map<String, ConnectorsAndTasks.Builder> toRevoke = new 
HashMap<>();
 
-        Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-        log.debug("Connector and task to delete assignments: {}", toRevoke);
+        final Map<String, ConnectorsAndTasks> deletedAndRevoked = 
intersection(deleted, memberAssignments);
+        log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+        addAll(toRevoke, deletedAndRevoked);
 
         // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-        toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-        log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+        final Map<String, ConnectorsAndTasks> duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+        log.trace("Duplicated connectors and tasks to revoke from each worker: 
{}", duplicatedAndRevoked);
+        addAll(toRevoke, duplicatedAndRevoked);
 
-        // Recompute the complete assignment excluding the deleted 
connectors-and-tasks
-        completeWorkerAssignment = workerAssignment(memberConfigs, deleted);
-        connectorAssignments =
-                
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-        taskAssignments =
-                
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
+        // Compute the assignment that will be applied across the cluster 
after this round of rebalance
+        // Later on, new submissions and lost-and-reassigned connectors and 
tasks will be added to these assignments,
+        // and load-balancing revocations will be removed from them.
+        final List<WorkerLoad> nextWorkerAssignment = 
workerLoads(memberAssignments, ConnectorsAndTasks.combine(deleted, duplicated));
 
-        handleLostAssignments(lostAssignments, newSubmissions, 
completeWorkerAssignment, memberConfigs);
+        final ConnectorsAndTasks.Builder lostAssignmentsToReassign = 
ConnectorsAndTasks.builder();
+        handleLostAssignments(lostAssignments, lostAssignmentsToReassign, 
nextWorkerAssignment);
 
         // Do not revoke resources for re-assignment while a delayed rebalance 
is active
-        // Also we do not revoke in two consecutive rebalances by the same 
leader
-        canRevoke = delay == 0 && canRevoke;
+        // Also we do not revoke in two consecutive rebalances by the same 
leader, which
+        // should be fine since workers immediately rejoin the group after a 
rebalance
+        // if connectors and/or tasks were revoked from them
+        canRevoke = canRevoke && delay == 0;
 
         // Compute the connectors-and-tasks to be revoked for load balancing 
without taking into
         // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: 
{})", canRevoke, delay);
+        log.trace("Can leader revoke tasks in this assignment? {} (delay: 
{})", canRevoke, delay);
         if (canRevoke) {
-            Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, 
currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", 
toRevoke);
-
-            toExplicitlyRevoke.forEach(
-                (worker, assignment) -> {
-                    ConnectorsAndTasks existing = toRevoke.computeIfAbsent(
-                        worker,
-                        v -> new ConnectorsAndTasks.Builder().build());
-                    existing.connectors().addAll(assignment.connectors());
-                    existing.tasks().addAll(assignment.tasks());
-                }
-            );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+            Map<String, ConnectorsAndTasks> loadBalancingRevocations =
+                    performLoadBalancingRevocations(configured, 
nextWorkerAssignment);
+
+            log.trace("Load-balancing revocations for each worker: {}", 
loadBalancingRevocations);
+            addAll(toRevoke, loadBalancingRevocations);
+            canRevoke = 
ConnectorsAndTasks.combine(loadBalancingRevocations.values()).isEmpty();
         } else {
             canRevoke = delay == 0;
         }
 
-        assignConnectors(completeWorkerAssignment, 
newSubmissions.connectors());
-        assignTasks(completeWorkerAssignment, newSubmissions.tasks());
-        log.debug("Current complete assignments: {}", currentWorkerAssignment);
-        log.debug("New complete assignments: {}", completeWorkerAssignment);
+        final ConnectorsAndTasks toAssign = 
ConnectorsAndTasks.combine(created, lostAssignmentsToReassign.build());
+        assignConnectors(nextWorkerAssignment, toAssign.connectors());
+        assignTasks(nextWorkerAssignment, toAssign.tasks());
 
-        Map<String, Collection<String>> currentConnectorAssignments =
-                
currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-        Map<String, Collection<ConnectorTaskId>> currentTaskAssignments =
-                
currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-        Map<String, Collection<String>> incrementalConnectorAssignments =
-                diff(connectorAssignments, currentConnectorAssignments);
-        Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
-                diff(taskAssignments, currentTaskAssignments);
+        // The complete set of connectors and tasks that will be running on 
each worker after this rebalance
+        final Map<String, ConnectorsAndTasks> nextAssignments = 
assignments(nextWorkerAssignment);
 
-        log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
-        log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
+        log.debug("Current complete assignments: {}", memberAssignments);
+        log.debug("New complete assignments: {}", nextAssignments);

Review Comment:
   Fair enough! Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to