Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]

2024-07-11 Thread via GitHub


yazgoo commented on code in PR #16486:
URL: https://github.com/apache/kafka/pull/16486#discussion_r1673685382


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -793,6 +796,46 @@ protected void assignConnectors(List 
workerAssignment, Collection implements Iterator {
+
+private final Map> grouped;
+private final List keys;
+
+private int k;
+
+public BalancedIterator(Collection collection, Function 
allocationGrouper) {
+this.k = 0;
+this.grouped = collection.stream().collect(Collectors.groupingBy(
+allocationGrouper,
+Collectors.collectingAndThen(
+Collectors.toList(),
+List::iterator
+)
+));
+this.keys = collection.stream()
+.map(allocationGrouper)
+.distinct()
+.collect(Collectors.toList());

Review Comment:
   Thanks for the clarification !
   The BalancedIterator is only used to group:
   
   - `Collection` by `connector`
   - `Collection` (string beings `Connector`) by `identity`
   
   So the structure is not aware of workers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]

2024-07-11 Thread via GitHub


yazgoo commented on code in PR #16486:
URL: https://github.com/apache/kafka/pull/16486#discussion_r1673685382


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -793,6 +796,46 @@ protected void assignConnectors(List 
workerAssignment, Collection implements Iterator {
+
+private final Map> grouped;
+private final List keys;
+
+private int k;
+
+public BalancedIterator(Collection collection, Function 
allocationGrouper) {
+this.k = 0;
+this.grouped = collection.stream().collect(Collectors.groupingBy(
+allocationGrouper,
+Collectors.collectingAndThen(
+Collectors.toList(),
+List::iterator
+)
+));
+this.keys = collection.stream()
+.map(allocationGrouper)
+.distinct()
+.collect(Collectors.toList());

Review Comment:
   Thanks for the clarification !
   The BalancedIterator is only used to group:
   
   - `Collection` by `connector
   - `Collection` (string beings `Connector`) by `identity`
   
   So the structure is not aware of workers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]

2024-07-10 Thread via GitHub


gharris1727 commented on code in PR #16486:
URL: https://github.com/apache/kafka/pull/16486#discussion_r1672817593


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -793,6 +796,46 @@ protected void assignConnectors(List 
workerAssignment, Collection implements Iterator {
+
+private final Map> grouped;
+private final List keys;
+
+private int k;
+
+public BalancedIterator(Collection collection, Function 
allocationGrouper) {
+this.k = 0;
+this.grouped = collection.stream().collect(Collectors.groupingBy(
+allocationGrouper,
+Collectors.collectingAndThen(
+Collectors.toList(),
+List::iterator
+)
+));
+this.keys = collection.stream()
+.map(allocationGrouper)
+.distinct()
+.collect(Collectors.toList());

Review Comment:
   Maybe I don't understand, but I don't think this changed anything. The 
incoming Collection may still have an over-representation of a single connector 
first, leading that connector to be preferentially revoked.
   
   For example, consider this situation
   
   ```
   W1: C1 C2 C3 C4
   W2: C1 C5 C6 C7
   W3: C1 C8 C9 C10
   ```
   
   If a new worker joins, C1 could be revoked because it appears the same 
number of times as all of the other connectors, but that would violate local 
balance later:
   
   ```
   W1: C2 C3 C4
   W2: C5 C6 C7
   W3: C8 C9 C10
   W4: C1 C1 C1
   ```
   
   The BalancedIterator isn't fairly tie-breaking when two connectors have the 
same number of jobs assigned to the current worker. Picking a single job to 
revoke depends on the entire rest of the state, and some degree of predicting 
how the jobs will be distributed afterwards.
   
   This is what I think the "ideal" state should be after that initial state:
   ```
   W1: C1 C2 C3
   W2: C1 C5 C6
   W3: C1 C8 C9
   W4: C4 C7 C10
   ```
   At most one C1 should be revoked overall, because revoking two to put on W4 
would break local balance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]

2024-07-10 Thread via GitHub


yazgoo commented on code in PR #16486:
URL: https://github.com/apache/kafka/pull/16486#discussion_r1672249822


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -793,6 +796,46 @@ protected void assignConnectors(List 
workerAssignment, Collection implements Iterator {
+
+private final Map> grouped;
+private final List keys;
+
+private int k;
+
+public BalancedIterator(Collection collection, Function 
allocationGrouper) {
+this.k = 0;
+this.grouped = collection.stream().collect(Collectors.groupingBy(
+allocationGrouper,
+Collectors.collectingAndThen(
+Collectors.toList(),
+List::iterator
+)
+));
+this.keys = collection.stream()
+.map(allocationGrouper)
+.distinct()
+.collect(Collectors.toList());

Review Comment:
   @gharris1727 does this look better to you ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]

2024-07-08 Thread via GitHub


gharris1727 commented on code in PR #16486:
URL: https://github.com/apache/kafka/pull/16486#discussion_r1669039009


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -793,6 +796,43 @@ protected void assignConnectors(List 
workerAssignment, Collection implements Iterator {
+
+private final Map> grouped;
+private final List keys;
+
+private int k;
+
+public BalancedIterator(Collection collection, Function 
allocationGrouper) {
+this.k = 0;
+this.grouped = collection.stream().collect(Collectors.groupingBy(
+allocationGrouper,
+Collectors.collectingAndThen(
+Collectors.toList(),
+List::iterator
+)
+));
+this.keys = new ArrayList<>(grouped.keySet());

Review Comment:
   Bump on this, as I think it's still an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]

2024-07-08 Thread via GitHub


C0urante commented on PR #16486:
URL: https://github.com/apache/kafka/pull/16486#issuecomment-2214494017

   Most of our CI failures are flaky tests that can be ignored, but there are a 
few failures that are happening because this PR directly causes them. You can 
find those in the `RebalanceSourceConnectorsIntegrationTest` and 
`IncrementalCooperativeAssignorTest` test suites, and I'd strongly suggest 
running those test suites locally before pushing another commit, as it's much 
faster to run tests locally than to wait for CI to do them.
   
   This PR might be acceptable (once the test failures are addressed) as-is, 
but there's an open question (that I raised 
[here](https://github.com/apache/kafka/pull/16486#discussion_r1661291768)) that 
would have to be settled on first. Essentially, we might have to do an 
additional round of revocations if we want to fully address the goal of this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]

2024-07-08 Thread via GitHub


yazgoo commented on PR #16486:
URL: https://github.com/apache/kafka/pull/16486#issuecomment-2213209130

   - any idea on why the tests fail ?
   - is this PR ok as a first improvement ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]

2024-07-02 Thread via GitHub


yazgoo commented on code in PR #16486:
URL: https://github.com/apache/kafka/pull/16486#discussion_r1661056552


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -131,6 +131,27 @@ public void testTaskAssignmentWhenWorkerJoins() {
 assertEmptyAssignment();
 }
 
+@Test
+public void checkIndividualConnectorBalance() {
+connectors.clear();
+addNewConnector("connector1", 12);
+performStandardRebalance();
+addNewConnector("connector2", 12);
+performStandardRebalance();
+addNewEmptyWorkers("worker2");
+performStandardRebalance();
+performStandardRebalance();
+addNewEmptyWorkers("worker3");
+performStandardRebalance();
+performStandardRebalance();
+assertEquals(3, memberAssignments.size());
+memberAssignments.forEach((k, v) -> {

Review Comment:
   I refactored everything in `BalancedIterator` and added it to assignTasks 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org