YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862455239
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ########## @@ -200,283 +229,149 @@ protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long ma previousAssignment = activeAssignments; canRevoke = true; } - previousRevocation.connectors().clear(); - previousRevocation.tasks().clear(); + previousRevocation = ConnectorsAndTasks.EMPTY; } - // Derived set: The set of deleted connectors-and-tasks is a derived set from the set - // difference of previous - configured - ConnectorsAndTasks deleted = diff(previousAssignment, configured); - log.debug("Deleted assignments: {}", deleted); - - // Derived set: The set of remaining active connectors-and-tasks is a derived set from the - // set difference of active - deleted - ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); - log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - - // Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from - // the set difference of previous - active - deleted - ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); - log.debug("Lost assignments: {}", lostAssignments); - - // Derived set: The set of new connectors-and-tasks is a derived set from the set - // difference of configured - previous - active - ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments); - log.debug("New assignments: {}", newSubmissions); + // The connectors and tasks that have been deleted since the last rebalance + final ConnectorsAndTasks deleted = ConnectorsAndTasks.diff(previousAssignment, configured); + log.trace("Deleted assignments: {}", deleted); - // A collection of the complete assignment - List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY); - log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment); + // The connectors and tasks that are currently running on more than one worker each + final ConnectorsAndTasks duplicated = duplicated(memberAssignments); + log.trace("Duplicated assignments: {}", duplicated); - // Per worker connector assignments without removing deleted connectors yet - Map<String, Collection<String>> connectorAssignments = - completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors)); - log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments); + // The connectors and tasks that should already be running on the cluster, but which are not included + // in the assignment reported by any workers in the cluster + final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted); Review Comment: Could you explain what `lost` assignment meaning is? As far as i know `ConnectorsAndTasks.diff` returns remainder after subtracted assignments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org