YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862455239


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -200,283 +229,149 @@ protected Map<String, ByteBuffer> 
performTaskAssignment(String leaderId, long ma
                 previousAssignment = activeAssignments;
                 canRevoke = true;
             }
-            previousRevocation.connectors().clear();
-            previousRevocation.tasks().clear();
+            previousRevocation = ConnectorsAndTasks.EMPTY;
         }
 
-        // Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-        // difference of previous - configured
-        ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-        log.debug("Deleted assignments: {}", deleted);
-
-        // Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-        // set difference of active - deleted
-        ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-        log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-        // Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-        // the set difference of previous - active - deleted
-        ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-        log.debug("Lost assignments: {}", lostAssignments);
-
-        // Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-        // difference of configured - previous - active
-        ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-        log.debug("New assignments: {}", newSubmissions);
+        // The connectors and tasks that have been deleted since the last 
rebalance
+        final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+        log.trace("Deleted assignments: {}", deleted);
 
-        // A collection of the complete assignment
-        List<WorkerLoad> completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-        log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+        // The connectors and tasks that are currently running on more than 
one worker each
+        final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+        log.trace("Duplicated assignments: {}", duplicated);
 
-        // Per worker connector assignments without removing deleted 
connectors yet
-        Map<String, Collection<String>> connectorAssignments =
-                
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-        log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+        // The connectors and tasks that should already be running on the 
cluster, but which are not included
+        // in the assignment reported by any workers in the cluster
+        final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);

Review Comment:
   Could you explain what `lost` assignment meaning is? 
   As far as i know `ConnectorsAndTasks.diff` returns remainder after 
subtracted assignments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to