[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862388033 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -110,14 +114,18 @@ public Map performAssignment(String leaderId, String protoco : CONNECT_PROTOCOL_V1; Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator); +Map assignments; if (leaderOffset == null) { -Map assignments = fillAssignments( +assignments = fillAssignments( memberConfigs.keySet(), Assignment.CONFIG_MISMATCH, -leaderId, memberConfigs.get(leaderId).url(), maxOffset, Collections.emptyMap(), -Collections.emptyMap(), Collections.emptyMap(), 0, protocolVersion); -return serializeAssignments(assignments); +leaderId, memberConfigs.get(leaderId).url(), maxOffset, +ClusterAssignment.EMPTY, 0, protocolVersion); +} else { +assignments = performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion); } -return performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion); +Map result = serializeAssignments(assignments); +log.debug("Finished assignment"); Review Comment: @C0urante This works with `Map assignment's'`. So maybe this? ```suggestion log.debug("Finished assignments"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862493447 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -526,165 +421,174 @@ private List pickCandidateWorkerForReassignment(List com } /** - * Task revocation is based on an rough estimation of the lower average number of tasks before - * and after new workers join the group. If no new workers join, no revocation takes place. - * Based on this estimation, tasks are revoked until the new floor average is reached for - * each existing worker. The revoked tasks, once assigned to the new workers will maintain - * a balanced load among the group. - * - * @param activeAssignments - * @param completeWorkerAssignment - * @return + * Revoke connectors and tasks from each worker in the cluster until no worker is running more than it would be if: + * + * The allocation of connectors and tasks across the cluster were as balanced as possible (i.e., the difference in allocation size between any two workers is at most one) + * Any workers that left the group within the scheduled rebalance delay permanently left the group + * All currently-configured connectors and tasks were allocated (including instances that may be revoked in this round because they are duplicated across workers) + * + * @param configured the set of configured connectors and tasks across the entire cluster + * @param workers the workers in the cluster, whose assignments should not include any deleted or duplicated connectors or tasks + *that are already due to be revoked from the worker in this rebalance + * @return which connectors and tasks should be revoked from which workers; never null, but may be empty + * if no load-balancing revocations are necessary or possible */ -private Map performTaskRevocation(ConnectorsAndTasks activeAssignments, - Collection completeWorkerAssignment) { -int totalActiveConnectorsNum = activeAssignments.connectors().size(); -int totalActiveTasksNum = activeAssignments.tasks().size(); -Collection existingWorkers = completeWorkerAssignment.stream() -.filter(wl -> wl.size() > 0) -.collect(Collectors.toList()); -int existingWorkersNum = existingWorkers.size(); -int totalWorkersNum = completeWorkerAssignment.size(); -int newWorkersNum = totalWorkersNum - existingWorkersNum; - -if (log.isDebugEnabled()) { -completeWorkerAssignment.forEach(wl -> log.debug( +private Map performLoadBalancingRevocations( +final ConnectorsAndTasks configured, +final Collection workers +) { +if (log.isTraceEnabled()) { +workers.forEach(wl -> log.trace( "Per worker current load size; worker: {} connectors: {} tasks: {}", wl.worker(), wl.connectorsSize(), wl.tasksSize())); } -Map revoking = new HashMap<>(); -// If there are no new workers, or no existing workers to revoke tasks from return early -// after logging the status -if (!(newWorkersNum > 0 && existingWorkersNum > 0)) { -log.debug("No task revocation required; workers with existing load: {} workers with " -+ "no load {} total workers {}", -existingWorkersNum, newWorkersNum, totalWorkersNum); -// This is intentionally empty but mutable, because the map is used to include deleted -// connectors and tasks as well -return revoking; +if (workers.stream().allMatch(WorkerLoad::isEmpty)) { +log.trace("No load-balancing revocations required; all workers are either new " ++ "or will have all currently-assigned connectors and tasks revoked during this round" +); +return Collections.emptyMap(); +} +if (configured.isEmpty()) { +log.trace("No load-balancing revocations required; no connectors are currently configured on this cluster"); +return Collections.emptyMap(); } -log.debug("Task revocation is required; workers with existing load: {} workers with " -+ "no load {} total workers {}", -existingWorkersNum, newWorkersNum, totalWorkersNum); - -// We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0 -log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum); -int floorConnectors = totalActiveConnectorsNum / totalWorkersNum; -int ceilConnectors = floorConnectors + ((totalActiveConn
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862492477 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -526,165 +421,174 @@ private List pickCandidateWorkerForReassignment(List com } /** - * Task revocation is based on an rough estimation of the lower average number of tasks before - * and after new workers join the group. If no new workers join, no revocation takes place. - * Based on this estimation, tasks are revoked until the new floor average is reached for - * each existing worker. The revoked tasks, once assigned to the new workers will maintain - * a balanced load among the group. - * - * @param activeAssignments - * @param completeWorkerAssignment - * @return + * Revoke connectors and tasks from each worker in the cluster until no worker is running more than it would be if: + * + * The allocation of connectors and tasks across the cluster were as balanced as possible (i.e., the difference in allocation size between any two workers is at most one) + * Any workers that left the group within the scheduled rebalance delay permanently left the group + * All currently-configured connectors and tasks were allocated (including instances that may be revoked in this round because they are duplicated across workers) + * Review Comment: Maybe this is silly question and i don't know this things. Is there any reason to put HTML tag for comments? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -526,165 +421,174 @@ private List pickCandidateWorkerForReassignment(List com } /** - * Task revocation is based on an rough estimation of the lower average number of tasks before - * and after new workers join the group. If no new workers join, no revocation takes place. - * Based on this estimation, tasks are revoked until the new floor average is reached for - * each existing worker. The revoked tasks, once assigned to the new workers will maintain - * a balanced load among the group. - * - * @param activeAssignments - * @param completeWorkerAssignment - * @return + * Revoke connectors and tasks from each worker in the cluster until no worker is running more than it would be if: + * + * The allocation of connectors and tasks across the cluster were as balanced as possible (i.e., the difference in allocation size between any two workers is at most one) + * Any workers that left the group within the scheduled rebalance delay permanently left the group + * All currently-configured connectors and tasks were allocated (including instances that may be revoked in this round because they are duplicated across workers) + * Review Comment: Maybe this is silly question and i don't know this things much. Is there any reason to put HTML tag for comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862491251 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -723,44 +610,175 @@ protected void assignConnectors(List workerAssignment, Collection workerAssignment, Collection tasks) { -workerAssignment.sort(WorkerLoad.taskComparator()); -WorkerLoad first = workerAssignment.get(0); +// Visible for testing +void assignTasks(List workerAssignment, Collection tasks) { +assign(workerAssignment, tasks, WorkerLoad::tasks, WorkerLoad::assign); +} -Iterator load = tasks.iterator(); +private void assign( +List workers, +Collection toAssign, +Function> currentAllocation, +BiConsumer assignToWorker +) { +Function allocationSize = currentAllocation.andThen(Collection::size); +workers.sort(Comparator.comparing(allocationSize)); +WorkerLoad first = workers.get(0); + +Iterator load = toAssign.stream().sorted().iterator(); while (load.hasNext()) { -int firstLoad = first.tasksSize(); -int upTo = IntStream.range(0, workerAssignment.size()) -.filter(i -> workerAssignment.get(i).tasksSize() > firstLoad) +int firstLoad = allocationSize.apply(first); +int upTo = IntStream.range(0, workers.size()) Review Comment: This is minor suggestion and could be ignored. If calculate `workers.size` in while loop it has to be calculated all the time while it is true. What about calculating size one time then use it many times. ``` final int workersSize = workers.size(); IntStream.range(0, workersSize) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862491251 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -723,44 +610,175 @@ protected void assignConnectors(List workerAssignment, Collection workerAssignment, Collection tasks) { -workerAssignment.sort(WorkerLoad.taskComparator()); -WorkerLoad first = workerAssignment.get(0); +// Visible for testing +void assignTasks(List workerAssignment, Collection tasks) { +assign(workerAssignment, tasks, WorkerLoad::tasks, WorkerLoad::assign); +} -Iterator load = tasks.iterator(); +private void assign( +List workers, +Collection toAssign, +Function> currentAllocation, +BiConsumer assignToWorker +) { +Function allocationSize = currentAllocation.andThen(Collection::size); +workers.sort(Comparator.comparing(allocationSize)); +WorkerLoad first = workers.get(0); + +Iterator load = toAssign.stream().sorted().iterator(); while (load.hasNext()) { -int firstLoad = first.tasksSize(); -int upTo = IntStream.range(0, workerAssignment.size()) -.filter(i -> workerAssignment.get(i).tasksSize() > firstLoad) +int firstLoad = allocationSize.apply(first); +int upTo = IntStream.range(0, workers.size()) Review Comment: This is minor suggestion and could be ignored. If calculate `workers.size` in while loop it has to be calculated all the time until it is true. What about calculating size one time then use it many times. ``` int workersSize = workers.size(); IntStream.range(0, workersSize) ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -723,44 +610,175 @@ protected void assignConnectors(List workerAssignment, Collection workerAssignment, Collection tasks) { -workerAssignment.sort(WorkerLoad.taskComparator()); -WorkerLoad first = workerAssignment.get(0); +// Visible for testing +void assignTasks(List workerAssignment, Collection tasks) { +assign(workerAssignment, tasks, WorkerLoad::tasks, WorkerLoad::assign); +} -Iterator load = tasks.iterator(); +private void assign( +List workers, +Collection toAssign, +Function> currentAllocation, +BiConsumer assignToWorker +) { +Function allocationSize = currentAllocation.andThen(Collection::size); +workers.sort(Comparator.comparing(allocationSize)); +WorkerLoad first = workers.get(0); + +Iterator load = toAssign.stream().sorted().iterator(); while (load.hasNext()) { -int firstLoad = first.tasksSize(); -int upTo = IntStream.range(0, workerAssignment.size()) -.filter(i -> workerAssignment.get(i).tasksSize() > firstLoad) +int firstLoad = allocationSize.apply(first); +int upTo = IntStream.range(0, workers.size()) Review Comment: This is minor suggestion and could be ignored. If calculate `workers.size` in while loop it has to be calculated all the time while it is true. What about calculating size one time then use it many times. ``` int workersSize = workers.size(); IntStream.range(0, workersSize) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862491251 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -723,44 +610,175 @@ protected void assignConnectors(List workerAssignment, Collection workerAssignment, Collection tasks) { -workerAssignment.sort(WorkerLoad.taskComparator()); -WorkerLoad first = workerAssignment.get(0); +// Visible for testing +void assignTasks(List workerAssignment, Collection tasks) { +assign(workerAssignment, tasks, WorkerLoad::tasks, WorkerLoad::assign); +} -Iterator load = tasks.iterator(); +private void assign( +List workers, +Collection toAssign, +Function> currentAllocation, +BiConsumer assignToWorker +) { +Function allocationSize = currentAllocation.andThen(Collection::size); +workers.sort(Comparator.comparing(allocationSize)); +WorkerLoad first = workers.get(0); + +Iterator load = toAssign.stream().sorted().iterator(); while (load.hasNext()) { -int firstLoad = first.tasksSize(); -int upTo = IntStream.range(0, workerAssignment.size()) -.filter(i -> workerAssignment.get(i).tasksSize() > firstLoad) +int firstLoad = allocationSize.apply(first); +int upTo = IntStream.range(0, workers.size()) Review Comment: This is minor suggestion and could be ignored. If calculate workers.size in while loop it has to be calculated all the time until it is true. What about calculating size one time then use it many times. ``` int workersSize = workers.size(); IntStream.range(0, workersSize) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862394876 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -151,19 +159,46 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { * @param coordinator the worker coordinator instance that provide the configuration snapshot * and get assigned the leader state during this assignment * @param protocolVersion the Connect subprotocol version - * @return the serialized assignment of tasks to the whole group, including assigned or - * revoked tasks + * @return the assignment of tasks to the whole group, including assigned or revoked tasks */ -protected Map performTaskAssignment(String leaderId, long maxOffset, +private Map performTaskAssignment(String leaderId, long maxOffset, Review Comment: Just wondering that why changing `protected` to `private`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862455239 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ protected Map performTaskAssignment(String leaderId, long ma previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); Review Comment: Could you explain what `lost` assignment meaning is? As far as i know `ConnectorsAndTasks.diff` returns remainder after subtracted assignments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862396035 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java: ## @@ -160,4 +166,40 @@ public static boolean isSourceConnector(Connector connector) { return SourceConnector.class.isAssignableFrom(connector.getClass()); } +public static Map transformValues(Map map, Function transformation) { +return map.entrySet().stream().collect(Collectors.toMap( +Map.Entry::getKey, +transformation.compose(Map.Entry::getValue) +)); +} + +public static List combineCollections(Collection> collections) { +return combineCollections(collections, Function.identity()); +} + +public static List combineCollections(Collection collection, Function> extractCollection) { +return combineCollections(collection, extractCollection, Collectors.toList()); +} + +public static C combineCollections( Review Comment: Could you explain why `combineCollections` return `combineCollections` then return `combineCollections`? For me, It looks final returning result is always ` C combineCollections`. Why not using only ` C combineCollections`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862394876 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -151,19 +159,46 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { * @param coordinator the worker coordinator instance that provide the configuration snapshot * and get assigned the leader state during this assignment * @param protocolVersion the Connect subprotocol version - * @return the serialized assignment of tasks to the whole group, including assigned or - * revoked tasks + * @return the assignment of tasks to the whole group, including assigned or revoked tasks */ -protected Map performTaskAssignment(String leaderId, long maxOffset, +private Map performTaskAssignment(String leaderId, long maxOffset, Review Comment: Just wondering that why changing `protected` to `private`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862388033 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -110,14 +114,18 @@ public Map performAssignment(String leaderId, String protoco : CONNECT_PROTOCOL_V1; Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator); +Map assignments; if (leaderOffset == null) { -Map assignments = fillAssignments( +assignments = fillAssignments( memberConfigs.keySet(), Assignment.CONFIG_MISMATCH, -leaderId, memberConfigs.get(leaderId).url(), maxOffset, Collections.emptyMap(), -Collections.emptyMap(), Collections.emptyMap(), 0, protocolVersion); -return serializeAssignments(assignments); +leaderId, memberConfigs.get(leaderId).url(), maxOffset, +ClusterAssignment.EMPTY, 0, protocolVersion); +} else { +assignments = performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion); } -return performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion); +Map result = serializeAssignments(assignments); +log.debug("Finished assignment"); Review Comment: This works with `Map assignment's'`. So maybe this? ```suggestion log.debug("Finished assignments"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862388033 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -110,14 +114,18 @@ public Map performAssignment(String leaderId, String protoco : CONNECT_PROTOCOL_V1; Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator); +Map assignments; if (leaderOffset == null) { -Map assignments = fillAssignments( +assignments = fillAssignments( memberConfigs.keySet(), Assignment.CONFIG_MISMATCH, -leaderId, memberConfigs.get(leaderId).url(), maxOffset, Collections.emptyMap(), -Collections.emptyMap(), Collections.emptyMap(), 0, protocolVersion); -return serializeAssignments(assignments); +leaderId, memberConfigs.get(leaderId).url(), maxOffset, +ClusterAssignment.EMPTY, 0, protocolVersion); +} else { +assignments = performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion); } -return performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion); +Map result = serializeAssignments(assignments); +log.debug("Finished assignment"); Review Comment: This works with `assignment's'` so maybe this? ```suggestion log.debug("Finished assignments"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r846804952 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r846804952 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r846804952 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r846804952 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r846804952 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { previousAssignment = activeAssignments; canRevoke = true; } -previousRevocation.connectors().clear(); -previousRevocation.tasks().clear(); +previousRevocation = ConnectorsAndTasks.EMPTY; } -// Derived set: The set of deleted connectors-and-tasks is a derived set from the set -// difference of previous - configured -ConnectorsAndTasks deleted = diff(previousAssignment, configured); -log.debug("Deleted assignments: {}", deleted); - -// Derived set: The set of remaining active connectors-and-tasks is a derived set from the -// set difference of active - deleted -ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); -log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - -// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from -// the set difference of previous - active - deleted -ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); -log.debug("Lost assignments: {}", lostAssignments); - -// Derived set: The set of new connectors-and-tasks is a derived set from the set -// difference of configured - previous - active -ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); -log.debug("New assignments: {}", newSubmissions); +// The connectors and tasks that have been deleted since the last rebalance +final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); +log.trace("Deleted assignments: {}", deleted); -// A collection of the complete assignment -List completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); -log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); +// The connectors and tasks that are currently running on more than one worker each +final ConnectorsAndTasks duplicated = duplicated(memberAssignments); +log.trace("Duplicated assignments: {}", duplicated); -// Per worker connector assignments without removing deleted connectors yet -Map> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); -log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); +// The connectors and tasks that should already be running on the cluster, but which are not included +// in the assignment reported by any workers in the cluster +final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); +log.trace("Lost assignments: {}", lostAssignments); -// Per worker task assignments without removing deleted connectors yet -Map> taskAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks)); -log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments); +// The connectors and tasks that have been created since the last rebalance +final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, previousAssignment, activeAssignments); +log.trace("New assignments: {}", created); -// A collection of the current assignment excluding the connectors-and-tasks to be deleted -List currentWorkerAssignment = workerAssignment(memberConfigs, deleted); +final Map toRevoke = new HashMap<>(); -Map toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); -log.debug("Connector and task to delete assignments: {}", toRevoke); +final Map deletedAndRevoked = intersection(deleted, memberAssignments); +log.trace("Deleted connectors and tasks to revoke from each worker: {}", deletedAndRevoked); +addAll(toRevoke, deletedAndRevoked); // Revoking redundant connectors/tasks if the workers have duplicate assignments -toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); -log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); +final Map duplicatedAndRevoked = intersection(duplicated, memberAssignments); +log.trace("Duplicated connectors