Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]
yazgoo commented on code in PR #16486: URL: https://github.com/apache/kafka/pull/16486#discussion_r1673685382 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -793,6 +796,46 @@ protected void assignConnectors(List workerAssignment, Collection implements Iterator { + +private final Map> grouped; +private final List keys; + +private int k; + +public BalancedIterator(Collection collection, Function allocationGrouper) { +this.k = 0; +this.grouped = collection.stream().collect(Collectors.groupingBy( +allocationGrouper, +Collectors.collectingAndThen( +Collectors.toList(), +List::iterator +) +)); +this.keys = collection.stream() +.map(allocationGrouper) +.distinct() +.collect(Collectors.toList()); Review Comment: Thanks for the clarification ! The BalancedIterator is only used to group: - `Collection` by `connector` - `Collection` (string beings `Connector`) by `identity` So the structure is not aware of workers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]
yazgoo commented on code in PR #16486: URL: https://github.com/apache/kafka/pull/16486#discussion_r1673685382 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -793,6 +796,46 @@ protected void assignConnectors(List workerAssignment, Collection implements Iterator { + +private final Map> grouped; +private final List keys; + +private int k; + +public BalancedIterator(Collection collection, Function allocationGrouper) { +this.k = 0; +this.grouped = collection.stream().collect(Collectors.groupingBy( +allocationGrouper, +Collectors.collectingAndThen( +Collectors.toList(), +List::iterator +) +)); +this.keys = collection.stream() +.map(allocationGrouper) +.distinct() +.collect(Collectors.toList()); Review Comment: Thanks for the clarification ! The BalancedIterator is only used to group: - `Collection` by `connector - `Collection` (string beings `Connector`) by `identity` So the structure is not aware of workers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]
gharris1727 commented on code in PR #16486: URL: https://github.com/apache/kafka/pull/16486#discussion_r1672817593 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -793,6 +796,46 @@ protected void assignConnectors(List workerAssignment, Collection implements Iterator { + +private final Map> grouped; +private final List keys; + +private int k; + +public BalancedIterator(Collection collection, Function allocationGrouper) { +this.k = 0; +this.grouped = collection.stream().collect(Collectors.groupingBy( +allocationGrouper, +Collectors.collectingAndThen( +Collectors.toList(), +List::iterator +) +)); +this.keys = collection.stream() +.map(allocationGrouper) +.distinct() +.collect(Collectors.toList()); Review Comment: Maybe I don't understand, but I don't think this changed anything. The incoming Collection may still have an over-representation of a single connector first, leading that connector to be preferentially revoked. For example, consider this situation ``` W1: C1 C2 C3 C4 W2: C1 C5 C6 C7 W3: C1 C8 C9 C10 ``` If a new worker joins, C1 could be revoked because it appears the same number of times as all of the other connectors, but that would violate local balance later: ``` W1: C2 C3 C4 W2: C5 C6 C7 W3: C8 C9 C10 W4: C1 C1 C1 ``` The BalancedIterator isn't fairly tie-breaking when two connectors have the same number of jobs assigned to the current worker. Picking a single job to revoke depends on the entire rest of the state, and some degree of predicting how the jobs will be distributed afterwards. This is what I think the "ideal" state should be after that initial state: ``` W1: C1 C2 C3 W2: C1 C5 C6 W3: C1 C8 C9 W4: C4 C7 C10 ``` At most one C1 should be revoked overall, because revoking two to put on W4 would break local balance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]
yazgoo commented on code in PR #16486: URL: https://github.com/apache/kafka/pull/16486#discussion_r1672249822 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -793,6 +796,46 @@ protected void assignConnectors(List workerAssignment, Collection implements Iterator { + +private final Map> grouped; +private final List keys; + +private int k; + +public BalancedIterator(Collection collection, Function allocationGrouper) { +this.k = 0; +this.grouped = collection.stream().collect(Collectors.groupingBy( +allocationGrouper, +Collectors.collectingAndThen( +Collectors.toList(), +List::iterator +) +)); +this.keys = collection.stream() +.map(allocationGrouper) +.distinct() +.collect(Collectors.toList()); Review Comment: @gharris1727 does this look better to you ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]
gharris1727 commented on code in PR #16486: URL: https://github.com/apache/kafka/pull/16486#discussion_r1669039009 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -793,6 +796,43 @@ protected void assignConnectors(List workerAssignment, Collection implements Iterator { + +private final Map> grouped; +private final List keys; + +private int k; + +public BalancedIterator(Collection collection, Function allocationGrouper) { +this.k = 0; +this.grouped = collection.stream().collect(Collectors.groupingBy( +allocationGrouper, +Collectors.collectingAndThen( +Collectors.toList(), +List::iterator +) +)); +this.keys = new ArrayList<>(grouped.keySet()); Review Comment: Bump on this, as I think it's still an issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]
C0urante commented on PR #16486: URL: https://github.com/apache/kafka/pull/16486#issuecomment-2214494017 Most of our CI failures are flaky tests that can be ignored, but there are a few failures that are happening because this PR directly causes them. You can find those in the `RebalanceSourceConnectorsIntegrationTest` and `IncrementalCooperativeAssignorTest` test suites, and I'd strongly suggest running those test suites locally before pushing another commit, as it's much faster to run tests locally than to wait for CI to do them. This PR might be acceptable (once the test failures are addressed) as-is, but there's an open question (that I raised [here](https://github.com/apache/kafka/pull/16486#discussion_r1661291768)) that would have to be settled on first. Essentially, we might have to do an additional round of revocations if we want to fully address the goal of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]
yazgoo commented on PR #16486: URL: https://github.com/apache/kafka/pull/16486#issuecomment-2213209130 - any idea on why the tests fail ? - is this PR ok as a first improvement ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]
yazgoo commented on code in PR #16486: URL: https://github.com/apache/kafka/pull/16486#discussion_r1661056552 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -131,6 +131,27 @@ public void testTaskAssignmentWhenWorkerJoins() { assertEmptyAssignment(); } +@Test +public void checkIndividualConnectorBalance() { +connectors.clear(); +addNewConnector("connector1", 12); +performStandardRebalance(); +addNewConnector("connector2", 12); +performStandardRebalance(); +addNewEmptyWorkers("worker2"); +performStandardRebalance(); +performStandardRebalance(); +addNewEmptyWorkers("worker3"); +performStandardRebalance(); +performStandardRebalance(); +assertEquals(3, memberAssignments.size()); +memberAssignments.forEach((k, v) -> { Review Comment: I refactored everything in `BalancedIterator` and added it to assignTasks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org