[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-05-02 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862388033


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -110,14 +114,18 @@ public Map performAssignment(String 
leaderId, String protoco
 : CONNECT_PROTOCOL_V1;
 
 Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
+Map assignments;
 if (leaderOffset == null) {
-Map assignments = fillAssignments(
+assignments = fillAssignments(
 memberConfigs.keySet(), Assignment.CONFIG_MISMATCH,
-leaderId, memberConfigs.get(leaderId).url(), maxOffset, 
Collections.emptyMap(),
-Collections.emptyMap(), Collections.emptyMap(), 0, 
protocolVersion);
-return serializeAssignments(assignments);
+leaderId, memberConfigs.get(leaderId).url(), maxOffset,
+ClusterAssignment.EMPTY, 0, protocolVersion);
+} else {
+assignments = performTaskAssignment(leaderId, leaderOffset, 
memberConfigs, coordinator, protocolVersion);
 }
-return performTaskAssignment(leaderId, leaderOffset, memberConfigs, 
coordinator, protocolVersion);
+Map result = serializeAssignments(assignments);
+log.debug("Finished assignment");

Review Comment:
   @C0urante
   
   This works with `Map assignment's'`.
   So maybe this?
   
   ```suggestion
   log.debug("Finished assignments");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-05-01 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862493447


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -526,165 +421,174 @@ private List 
pickCandidateWorkerForReassignment(List com
 }
 
 /**
- * Task revocation is based on an rough estimation of the lower average 
number of tasks before
- * and after new workers join the group. If no new workers join, no 
revocation takes place.
- * Based on this estimation, tasks are revoked until the new floor average 
is reached for
- * each existing worker. The revoked tasks, once assigned to the new 
workers will maintain
- * a balanced load among the group.
- *
- * @param activeAssignments
- * @param completeWorkerAssignment
- * @return
+ * Revoke connectors and tasks from each worker in the cluster until no 
worker is running more than it would be if:
+ * 
+ * The allocation of connectors and tasks across the cluster were 
as balanced as possible (i.e., the difference in allocation size between any 
two workers is at most one)
+ * Any workers that left the group within the scheduled rebalance 
delay permanently left the group
+ * All currently-configured connectors and tasks were allocated 
(including instances that may be revoked in this round because they are 
duplicated across workers)
+ * 
+ * @param configured the set of configured connectors and tasks across the 
entire cluster
+ * @param workers the workers in the cluster, whose assignments should not 
include any deleted or duplicated connectors or tasks
+ *that are already due to be revoked from the worker in 
this rebalance
+ * @return which connectors and tasks should be revoked from which 
workers; never null, but may be empty
+ * if no load-balancing revocations are necessary or possible
  */
-private Map 
performTaskRevocation(ConnectorsAndTasks activeAssignments,
-  
Collection completeWorkerAssignment) {
-int totalActiveConnectorsNum = activeAssignments.connectors().size();
-int totalActiveTasksNum = activeAssignments.tasks().size();
-Collection existingWorkers = 
completeWorkerAssignment.stream()
-.filter(wl -> wl.size() > 0)
-.collect(Collectors.toList());
-int existingWorkersNum = existingWorkers.size();
-int totalWorkersNum = completeWorkerAssignment.size();
-int newWorkersNum = totalWorkersNum - existingWorkersNum;
-
-if (log.isDebugEnabled()) {
-completeWorkerAssignment.forEach(wl -> log.debug(
+private Map performLoadBalancingRevocations(
+final ConnectorsAndTasks configured,
+final Collection workers
+) {
+if (log.isTraceEnabled()) {
+workers.forEach(wl -> log.trace(
 "Per worker current load size; worker: {} connectors: {} 
tasks: {}",
 wl.worker(), wl.connectorsSize(), wl.tasksSize()));
 }
 
-Map revoking = new HashMap<>();
-// If there are no new workers, or no existing workers to revoke tasks 
from return early
-// after logging the status
-if (!(newWorkersNum > 0 && existingWorkersNum > 0)) {
-log.debug("No task revocation required; workers with existing 
load: {} workers with "
-+ "no load {} total workers {}",
-existingWorkersNum, newWorkersNum, totalWorkersNum);
-// This is intentionally empty but mutable, because the map is 
used to include deleted
-// connectors and tasks as well
-return revoking;
+if (workers.stream().allMatch(WorkerLoad::isEmpty)) {
+log.trace("No load-balancing revocations required; all workers are 
either new "
++ "or will have all currently-assigned connectors and 
tasks revoked during this round"
+);
+return Collections.emptyMap();
+}
+if (configured.isEmpty()) {
+log.trace("No load-balancing revocations required; no connectors 
are currently configured on this cluster");
+return Collections.emptyMap();
 }
 
-log.debug("Task revocation is required; workers with existing load: {} 
workers with "
-+ "no load {} total workers {}",
-existingWorkersNum, newWorkersNum, totalWorkersNum);
-
-// We have at least one worker assignment (the leader itself) so 
totalWorkersNum can't be 0
-log.debug("Previous rounded down (floor) average number of connectors 
per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-int ceilConnectors = floorConnectors + ((totalActiveConn

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-05-01 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862492477


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -526,165 +421,174 @@ private List 
pickCandidateWorkerForReassignment(List com
 }
 
 /**
- * Task revocation is based on an rough estimation of the lower average 
number of tasks before
- * and after new workers join the group. If no new workers join, no 
revocation takes place.
- * Based on this estimation, tasks are revoked until the new floor average 
is reached for
- * each existing worker. The revoked tasks, once assigned to the new 
workers will maintain
- * a balanced load among the group.
- *
- * @param activeAssignments
- * @param completeWorkerAssignment
- * @return
+ * Revoke connectors and tasks from each worker in the cluster until no 
worker is running more than it would be if:
+ * 
+ * The allocation of connectors and tasks across the cluster were 
as balanced as possible (i.e., the difference in allocation size between any 
two workers is at most one)
+ * Any workers that left the group within the scheduled rebalance 
delay permanently left the group
+ * All currently-configured connectors and tasks were allocated 
(including instances that may be revoked in this round because they are 
duplicated across workers)
+ * 

Review Comment:
   Maybe this is silly question and i don't know this things.
   Is there any reason to put HTML tag for comments?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -526,165 +421,174 @@ private List 
pickCandidateWorkerForReassignment(List com
 }
 
 /**
- * Task revocation is based on an rough estimation of the lower average 
number of tasks before
- * and after new workers join the group. If no new workers join, no 
revocation takes place.
- * Based on this estimation, tasks are revoked until the new floor average 
is reached for
- * each existing worker. The revoked tasks, once assigned to the new 
workers will maintain
- * a balanced load among the group.
- *
- * @param activeAssignments
- * @param completeWorkerAssignment
- * @return
+ * Revoke connectors and tasks from each worker in the cluster until no 
worker is running more than it would be if:
+ * 
+ * The allocation of connectors and tasks across the cluster were 
as balanced as possible (i.e., the difference in allocation size between any 
two workers is at most one)
+ * Any workers that left the group within the scheduled rebalance 
delay permanently left the group
+ * All currently-configured connectors and tasks were allocated 
(including instances that may be revoked in this round because they are 
duplicated across workers)
+ * 

Review Comment:
   Maybe this is silly question and i don't know this things much.
   Is there any reason to put HTML tag for comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-05-01 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862491251


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -723,44 +610,175 @@ protected void assignConnectors(List 
workerAssignment, Collection workerAssignment, 
Collection tasks) {
-workerAssignment.sort(WorkerLoad.taskComparator());
-WorkerLoad first = workerAssignment.get(0);
+// Visible for testing
+void assignTasks(List workerAssignment, 
Collection tasks) {
+assign(workerAssignment, tasks, WorkerLoad::tasks, WorkerLoad::assign);
+}
 
-Iterator load = tasks.iterator();
+private  void assign(
+List workers,
+Collection toAssign,
+Function> currentAllocation,
+BiConsumer assignToWorker
+) {
+Function allocationSize = 
currentAllocation.andThen(Collection::size);
+workers.sort(Comparator.comparing(allocationSize));
+WorkerLoad first = workers.get(0);
+
+Iterator load = toAssign.stream().sorted().iterator();
 while (load.hasNext()) {
-int firstLoad = first.tasksSize();
-int upTo = IntStream.range(0, workerAssignment.size())
-.filter(i -> workerAssignment.get(i).tasksSize() > 
firstLoad)
+int firstLoad = allocationSize.apply(first);
+int upTo = IntStream.range(0, workers.size())

Review Comment:
   This is minor suggestion and could be ignored. 
   If calculate `workers.size` in while loop it has to be calculated all the 
time while it is true.
   What about calculating size one time then use it many times.
   
   ```
   final int workersSize = workers.size();
   IntStream.range(0, workersSize)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-05-01 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862491251


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -723,44 +610,175 @@ protected void assignConnectors(List 
workerAssignment, Collection workerAssignment, 
Collection tasks) {
-workerAssignment.sort(WorkerLoad.taskComparator());
-WorkerLoad first = workerAssignment.get(0);
+// Visible for testing
+void assignTasks(List workerAssignment, 
Collection tasks) {
+assign(workerAssignment, tasks, WorkerLoad::tasks, WorkerLoad::assign);
+}
 
-Iterator load = tasks.iterator();
+private  void assign(
+List workers,
+Collection toAssign,
+Function> currentAllocation,
+BiConsumer assignToWorker
+) {
+Function allocationSize = 
currentAllocation.andThen(Collection::size);
+workers.sort(Comparator.comparing(allocationSize));
+WorkerLoad first = workers.get(0);
+
+Iterator load = toAssign.stream().sorted().iterator();
 while (load.hasNext()) {
-int firstLoad = first.tasksSize();
-int upTo = IntStream.range(0, workerAssignment.size())
-.filter(i -> workerAssignment.get(i).tasksSize() > 
firstLoad)
+int firstLoad = allocationSize.apply(first);
+int upTo = IntStream.range(0, workers.size())

Review Comment:
   This is minor suggestion and could be ignored. 
   If calculate `workers.size` in while loop it has to be calculated all the 
time until it is true.
   What about calculating size one time then use it many times.
   
   ```
   int workersSize = workers.size();
   IntStream.range(0, workersSize)
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -723,44 +610,175 @@ protected void assignConnectors(List 
workerAssignment, Collection workerAssignment, 
Collection tasks) {
-workerAssignment.sort(WorkerLoad.taskComparator());
-WorkerLoad first = workerAssignment.get(0);
+// Visible for testing
+void assignTasks(List workerAssignment, 
Collection tasks) {
+assign(workerAssignment, tasks, WorkerLoad::tasks, WorkerLoad::assign);
+}
 
-Iterator load = tasks.iterator();
+private  void assign(
+List workers,
+Collection toAssign,
+Function> currentAllocation,
+BiConsumer assignToWorker
+) {
+Function allocationSize = 
currentAllocation.andThen(Collection::size);
+workers.sort(Comparator.comparing(allocationSize));
+WorkerLoad first = workers.get(0);
+
+Iterator load = toAssign.stream().sorted().iterator();
 while (load.hasNext()) {
-int firstLoad = first.tasksSize();
-int upTo = IntStream.range(0, workerAssignment.size())
-.filter(i -> workerAssignment.get(i).tasksSize() > 
firstLoad)
+int firstLoad = allocationSize.apply(first);
+int upTo = IntStream.range(0, workers.size())

Review Comment:
   This is minor suggestion and could be ignored. 
   If calculate `workers.size` in while loop it has to be calculated all the 
time while it is true.
   What about calculating size one time then use it many times.
   
   ```
   int workersSize = workers.size();
   IntStream.range(0, workersSize)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-05-01 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862491251


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -723,44 +610,175 @@ protected void assignConnectors(List 
workerAssignment, Collection workerAssignment, 
Collection tasks) {
-workerAssignment.sort(WorkerLoad.taskComparator());
-WorkerLoad first = workerAssignment.get(0);
+// Visible for testing
+void assignTasks(List workerAssignment, 
Collection tasks) {
+assign(workerAssignment, tasks, WorkerLoad::tasks, WorkerLoad::assign);
+}
 
-Iterator load = tasks.iterator();
+private  void assign(
+List workers,
+Collection toAssign,
+Function> currentAllocation,
+BiConsumer assignToWorker
+) {
+Function allocationSize = 
currentAllocation.andThen(Collection::size);
+workers.sort(Comparator.comparing(allocationSize));
+WorkerLoad first = workers.get(0);
+
+Iterator load = toAssign.stream().sorted().iterator();
 while (load.hasNext()) {
-int firstLoad = first.tasksSize();
-int upTo = IntStream.range(0, workerAssignment.size())
-.filter(i -> workerAssignment.get(i).tasksSize() > 
firstLoad)
+int firstLoad = allocationSize.apply(first);
+int upTo = IntStream.range(0, workers.size())

Review Comment:
   This is minor suggestion and could be ignored. 
   If calculate workers.size in while loop it has to be calculated all the time 
until it is true.
   What about calculating size one time then use it many times.
   
   ```
   int workersSize = workers.size();
   IntStream.range(0, workersSize)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-05-01 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862394876


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -151,19 +159,46 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
  * @param coordinator the worker coordinator instance that provide the 
configuration snapshot
  * and get assigned the leader state during this assignment
  * @param protocolVersion the Connect subprotocol version
- * @return the serialized assignment of tasks to the whole group, 
including assigned or
- * revoked tasks
+ * @return the assignment of tasks to the whole group, including assigned 
or revoked tasks
  */
-protected Map performTaskAssignment(String leaderId, 
long maxOffset,
+private Map performTaskAssignment(String 
leaderId, long maxOffset,

Review Comment:
   Just wondering that why changing `protected` to `private`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-05-01 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862455239


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ protected Map 
performTaskAssignment(String leaderId, long ma
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);

Review Comment:
   Could you explain what `lost` assignment meaning is? 
   As far as i know `ConnectorsAndTasks.diff` returns remainder after 
subtracted assignments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-30 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862396035


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java:
##
@@ -160,4 +166,40 @@ public static boolean isSourceConnector(Connector 
connector) {
 return SourceConnector.class.isAssignableFrom(connector.getClass());
 }
 
+public static  Map transformValues(Map map, 
Function transformation) {
+return map.entrySet().stream().collect(Collectors.toMap(
+Map.Entry::getKey,
+transformation.compose(Map.Entry::getValue)
+));
+}
+
+public static  List combineCollections(Collection> collections) {
+return combineCollections(collections, Function.identity());
+}
+
+public static  List combineCollections(Collection 
collection, Function> extractCollection) {
+return combineCollections(collection, extractCollection, 
Collectors.toList());
+}
+
+public static  C combineCollections(

Review Comment:
   Could you explain why `combineCollections` return `combineCollections` then 
return `combineCollections`?
   
   For me, It looks final returning result is always ` C 
combineCollections`. Why not using only ` C combineCollections`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-30 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862394876


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -151,19 +159,46 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
  * @param coordinator the worker coordinator instance that provide the 
configuration snapshot
  * and get assigned the leader state during this assignment
  * @param protocolVersion the Connect subprotocol version
- * @return the serialized assignment of tasks to the whole group, 
including assigned or
- * revoked tasks
+ * @return the assignment of tasks to the whole group, including assigned 
or revoked tasks
  */
-protected Map performTaskAssignment(String leaderId, 
long maxOffset,
+private Map performTaskAssignment(String 
leaderId, long maxOffset,

Review Comment:
   Just wondering that why changing `protected` to `private`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-30 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862388033


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -110,14 +114,18 @@ public Map performAssignment(String 
leaderId, String protoco
 : CONNECT_PROTOCOL_V1;
 
 Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
+Map assignments;
 if (leaderOffset == null) {
-Map assignments = fillAssignments(
+assignments = fillAssignments(
 memberConfigs.keySet(), Assignment.CONFIG_MISMATCH,
-leaderId, memberConfigs.get(leaderId).url(), maxOffset, 
Collections.emptyMap(),
-Collections.emptyMap(), Collections.emptyMap(), 0, 
protocolVersion);
-return serializeAssignments(assignments);
+leaderId, memberConfigs.get(leaderId).url(), maxOffset,
+ClusterAssignment.EMPTY, 0, protocolVersion);
+} else {
+assignments = performTaskAssignment(leaderId, leaderOffset, 
memberConfigs, coordinator, protocolVersion);
 }
-return performTaskAssignment(leaderId, leaderOffset, memberConfigs, 
coordinator, protocolVersion);
+Map result = serializeAssignments(assignments);
+log.debug("Finished assignment");

Review Comment:
   This works with `Map assignment's'`.
   So maybe this?
   
   ```suggestion
   log.debug("Finished assignments");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-30 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862388033


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -110,14 +114,18 @@ public Map performAssignment(String 
leaderId, String protoco
 : CONNECT_PROTOCOL_V1;
 
 Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
+Map assignments;
 if (leaderOffset == null) {
-Map assignments = fillAssignments(
+assignments = fillAssignments(
 memberConfigs.keySet(), Assignment.CONFIG_MISMATCH,
-leaderId, memberConfigs.get(leaderId).url(), maxOffset, 
Collections.emptyMap(),
-Collections.emptyMap(), Collections.emptyMap(), 0, 
protocolVersion);
-return serializeAssignments(assignments);
+leaderId, memberConfigs.get(leaderId).url(), maxOffset,
+ClusterAssignment.EMPTY, 0, protocolVersion);
+} else {
+assignments = performTaskAssignment(leaderId, leaderOffset, 
memberConfigs, coordinator, protocolVersion);
 }
-return performTaskAssignment(leaderId, leaderOffset, memberConfigs, 
coordinator, protocolVersion);
+Map result = serializeAssignments(assignments);
+log.debug("Finished assignment");

Review Comment:
   This works with `assignment's'` so maybe this?
   
   ```suggestion
   log.debug("Finished assignments");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-12 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-11 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-11 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-11 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-11 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-11 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r847446537


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-10 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r846804952


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-10 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r846804952


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-10 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r846804952


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-10 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r846804952


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors

[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-04-10 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r846804952


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -200,283 +229,149 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 previousAssignment = activeAssignments;
 canRevoke = true;
 }
-previousRevocation.connectors().clear();
-previousRevocation.tasks().clear();
+previousRevocation = ConnectorsAndTasks.EMPTY;
 }
 
-// Derived set: The set of deleted connectors-and-tasks is a derived 
set from the set
-// difference of previous - configured
-ConnectorsAndTasks deleted = diff(previousAssignment, configured);
-log.debug("Deleted assignments: {}", deleted);
-
-// Derived set: The set of remaining active connectors-and-tasks is a 
derived set from the
-// set difference of active - deleted
-ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
-log.debug("Remaining (excluding deleted) active assignments: {}", 
remainingActive);
-
-// Derived set: The set of lost or unaccounted connectors-and-tasks is 
a derived set from
-// the set difference of previous - active - deleted
-ConnectorsAndTasks lostAssignments = diff(previousAssignment, 
activeAssignments, deleted);
-log.debug("Lost assignments: {}", lostAssignments);
-
-// Derived set: The set of new connectors-and-tasks is a derived set 
from the set
-// difference of configured - previous - active
-ConnectorsAndTasks newSubmissions = diff(configured, 
previousAssignment, activeAssignments);
-log.debug("New assignments: {}", newSubmissions);
+// The connectors and tasks that have been deleted since the last 
rebalance
+final ConnectorsAndTasks deleted = 
ConnectorsAndTasks.diff(previousAssignment, configured);
+log.trace("Deleted assignments: {}", deleted);
 
-// A collection of the complete assignment
-List completeWorkerAssignment = 
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
-log.debug("Complete (ignoring deletions) worker assignments: {}", 
completeWorkerAssignment);
+// The connectors and tasks that are currently running on more than 
one worker each
+final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+log.trace("Duplicated assignments: {}", duplicated);
 
-// Per worker connector assignments without removing deleted 
connectors yet
-Map> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::connectors));
-log.debug("Complete (ignoring deletions) connector assignments: {}", 
connectorAssignments);
+// The connectors and tasks that should already be running on the 
cluster, but which are not included
+// in the assignment reported by any workers in the cluster
+final ConnectorsAndTasks lostAssignments = 
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
+log.trace("Lost assignments: {}", lostAssignments);
 
-// Per worker task assignments without removing deleted connectors yet
-Map> taskAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, 
WorkerLoad::tasks));
-log.debug("Complete (ignoring deletions) task assignments: {}", 
taskAssignments);
+// The connectors and tasks that have been created since the last 
rebalance
+final ConnectorsAndTasks created = ConnectorsAndTasks.diff(configured, 
previousAssignment, activeAssignments);
+log.trace("New assignments: {}", created);
 
-// A collection of the current assignment excluding the 
connectors-and-tasks to be deleted
-List currentWorkerAssignment = 
workerAssignment(memberConfigs, deleted);
+final Map toRevoke = new 
HashMap<>();
 
-Map toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
-log.debug("Connector and task to delete assignments: {}", toRevoke);
+final Map deletedAndRevoked = 
intersection(deleted, memberAssignments);
+log.trace("Deleted connectors and tasks to revoke from each worker: 
{}", deletedAndRevoked);
+addAll(toRevoke, deletedAndRevoked);
 
 // Revoking redundant connectors/tasks if the workers have duplicate 
assignments
-toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
-log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+final Map duplicatedAndRevoked = 
intersection(duplicated, memberAssignments);
+log.trace("Duplicated connectors