C0urante commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r849010637
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ########## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } - previousRevocation.connectors().clear(); - previousRevocation.tasks().clear(); + previousRevocation = ConnectorsAndTasks.EMPTY; } - // Derived set: The set of deleted connectors-and-tasks is a derived set from the set - // difference of previous - configured - ConnectorsAndTasks deleted = diff(previousAssignment, configured); - log.debug("Deleted assignments: {}", deleted); - - // Derived set: The set of remaining active connectors-and-tasks is a derived set from the - // set difference of active - deleted - ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); - log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - - // Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from - // the set difference of previous - active - deleted - ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); - log.debug("Lost assignments: {}", lostAssignments); - - // Derived set: The set of new connectors-and-tasks is a derived set from the set - // difference of configured - previous - active - ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); - log.debug("New assignments: {}", newSubmissions); + // The connectors and tasks that have been deleted since the last rebalance + final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); + log.trace("Deleted assignments: {}", deleted); - // A collection of the complete assignment - List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); - log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); + // The connectors and tasks that are currently running on more than one worker each + final ConnectorsAndTasks duplicated = duplicated(memberAssignments); + log.trace("Duplicated assignments: {}", duplicated); - // Per worker connector assignments without removing deleted connectors yet - Map<String, Collection<String>> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); - log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); + // The connectors and tasks that should already be running on the cluster, but which are not included + // in the assignment reported by any workers in the cluster + final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); + log.trace("Lost assignments: {}", lostAssignments); - // Per worker task assignments without removing deleted connectors yet - Map<String, Collection<ConnectorTaskId>> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); - log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); + // The connectors and tasks that have been created since the last rebalance + final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); + log.trace("New assignments: {}", created); - // A collection of the current assignment excluding the connectors-and-tasks to be deleted - List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberConfigs, deleted); + final Map<String, ConnectorsAndTasks.Builder> toRevoke = new HashMap<>(); - Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); - log.debug("Connector and task to delete assignments: {}", toRevoke); + final Map<String, ConnectorsAndTasks> deletedAndRevoked = intersection(deleted, memberAssignments); + log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); + addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments - toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); - log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); + final Map<String, ConnectorsAndTasks> duplicatedAndRevoked = intersection(duplicated, memberAssignments); + log.trace("Duplicated connectors and tasks to revoke from each worker: {}", duplicatedAndRevoked); + addAll(toRevoke, duplicatedAndRevoked); - // Recompute the complete assignment excluding the deleted connectors-and-tasks - completeWorkerAssignment = workerAssignment(memberConfigs, deleted); - connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); - taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); + // Compute the assignment that will be applied across the cluster after this round of rebalance + // Later on, new submissions and lost-and-reassigned connectors and tasks will be added to these assignments, + // and load-balancing revocations will be removed from them. + final List<WorkerLoad> nextWorkerAssignment = workerLoads(memberAssignments, ConnectorsAndTasks.combine(deleted, duplicated)); - handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment, memberConfigs); + final ConnectorsAndTasks.Builder lostAssignmentsToReassign = ConnectorsAndTasks.builder(); + handleLostAssignments(lostAssignments, lostAssignmentsToReassign, nextWorkerAssignment); // Do not revoke resources for re-assignment while a delayed rebalance is active - // Also we do not revoke in two consecutive rebalances by the same leader - canRevoke = delay == 0 && canRevoke; + // Also we do not revoke in two consecutive rebalances by the same leader, which + // should be fine since workers immediately rejoin the group after a rebalance + // if connectors and/or tasks were revoked from them + canRevoke = canRevoke && delay == 0; // Compute the connectors-and-tasks to be revoked for load balancing without taking into // account the deleted ones. - log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay); + log.trace("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay); if (canRevoke) { - Map<String, ConnectorsAndTasks> toExplicitlyRevoke = - performTaskRevocation(activeAssignments, currentWorkerAssignment); - - log.debug("Connector and task to revoke assignments: {}", toRevoke); - - toExplicitlyRevoke.forEach( - (worker, assignment) -> { - ConnectorsAndTasks existing = toRevoke.computeIfAbsent( - worker, - v -> new ConnectorsAndTasks.Builder().build()); - existing.connectors().addAll(assignment.connectors()); - existing.tasks().addAll(assignment.tasks()); - } - ); - canRevoke = toExplicitlyRevoke.size() == 0; + Map<String, ConnectorsAndTasks> loadBalancingRevocations = + performLoadBalancingRevocations(configured, nextWorkerAssignment); + + log.trace("Load-balancing revocations for each worker: {}", loadBalancingRevocations); + addAll(toRevoke, loadBalancingRevocations); + canRevoke = ConnectorsAndTasks.combine(loadBalancingRevocations.values()).isEmpty(); } else { canRevoke = delay == 0; } - assignConnectors(completeWorkerAssignment, newSubmissions.connectors()); - assignTasks(completeWorkerAssignment, newSubmissions.tasks()); - log.debug("Current complete assignments: {}", currentWorkerAssignment); - log.debug("New complete assignments: {}", completeWorkerAssignment); + final ConnectorsAndTasks toAssign = ConnectorsAndTasks.combine(created, lostAssignmentsToReassign.build()); + assignConnectors(nextWorkerAssignment, toAssign.connectors()); + assignTasks(nextWorkerAssignment, toAssign.tasks()); - Map<String, Collection<String>> currentConnectorAssignments = - currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); - Map<String, Collection<ConnectorTaskId>> currentTaskAssignments = - currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); - Map<String, Collection<String>> incrementalConnectorAssignments = - diff(connectorAssignments, currentConnectorAssignments); - Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments = - diff(taskAssignments, currentTaskAssignments); + // The complete set of connectors and tasks that will be running on each worker after this rebalance + final Map<String, ConnectorsAndTasks> nextAssignments = assignments(nextWorkerAssignment); - log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments); - log.debug("Incremental task assignments: {}", incrementalTaskAssignments); + log.debug("Current complete assignments: {}", memberAssignments); + log.debug("New complete assignments: {}", nextAssignments); Review Comment: Fair enough! Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org