C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r977936338


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Yep, I was able to produce a test case that causes an imbalanced assignment 
to be sent out with no revocations or delay by adding a new worker in the 
middle of a delayed rebalance:
   
   ```java
   @Test
   public void testWorkerJoiningDuringDelayedRebalance() {
       time = new MockTime();
       initAssignor();
   
       addNewConnector("connector3", 4);
       // First assignment with 1 worker and 3 connectors configured but not 
yet assigned
       performStandardRebalance();
       assertDelay(0);
       assertWorkers("worker1");
       assertConnectorAllocations(3);
       assertTaskAllocations(12);
       assertBalancedAndCompleteAllocation();
   
       // Second assignment with a second worker joining and all connectors 
running on previous worker
       // We should revoke.
       addNewEmptyWorkers("worker2");
       performStandardRebalance();
       assertWorkers("worker1", "worker2");
       assertConnectorAllocations(0, 2);
       assertTaskAllocations(0, 6);
   
       // Third assignment immediately after revocations, and a third worker 
joining.
       // This is a successive revoking rebalance. We should not perform any 
revocations
       // in this round
       addNewEmptyWorkers("worker3");
       performStandardRebalance();
       assertTrue(assignor.delay > 0);
       assertWorkers("worker1", "worker2", "worker3");
       assertConnectorAllocations(0, 1, 2);
       assertTaskAllocations(3, 3, 6);
   
       // Fourth assignment and a fourth worker joining
       // while delayed rebalance is active. We should not revoke
       time.sleep(assignor.delay / 2);
       addNewEmptyWorkers("worker4");
       performStandardRebalance();
       assertWorkers("worker1", "worker2", "worker3", "worker4");
       assertConnectorAllocations(0, 0, 1, 2);
       assertTaskAllocations(0, 3, 3, 6);
   
       // Fifth assignment and a fifth worker joining
       // after the delay has expired. We should revoke, but we don't
       time.sleep(assignor.delay);
       addNewEmptyWorkers("worker5");
       performStandardRebalance();
       assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
       assertNoRevocations();
       assertDelay(0);
       assertBalancedAndCompleteAllocation();
   }
   ```
   
   It's also worth noting that many of the test cases in this PR need to be 
updated with this bit at the beginning in order to mock the `Time` instance 
used by the assignor:
   ```java
   time = new MockTime();
   initAssignor();
   ```
   This should be added in any test that invokes `time::sleep`.
   
   At this point, I think we may want to split this into two separate PRs that 
get merged together. We can revert the `canRevoke` flag from this one, and then 
add a downstream PR that fixes how we calculate task-balancing revocations in 
tricky situations like when lost or newly-created tasks have just been 
assigned. That should fully address cases like the one tested 
[here](https://github.com/apache/kafka/blob/cda5da9b65f78b51cdfe5371f712a0d392dbdb4d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java#L427).
   
   @showuon WDYT? I could extract some of the changes from 
https://github.com/apache/kafka/pull/12019 that would fix the load-balancing 
revocation logic and rebase them onto this change if it helps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to