C0urante commented on a change in pull request #10367: URL: https://github.com/apache/kafka/pull/10367#discussion_r833730613
########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java ########## @@ -180,6 +208,155 @@ public void testTaskAssignmentWhenWorkerJoins() { verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId(); } + @Test + public void testTaskAssignmentWhenWorkerJoinAfterRevocation() { + when(coordinator.configSnapshot()).thenReturn(configState); + doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); + + // First assignment with 1 worker and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: + // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3] + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(2, 8, 0, 0, "worker1"); + + // Second assignment with a second worker joining and all connectors running on previous worker + // + // assignment after this phase: + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3] + // W2: assignedConnectors:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[], tasks:[] + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 0, 1, 4, "worker1", "worker2"); + + // Third assignment after revocations, and a third worker joining + // + // assignment after this phase: + // W1: assignedTasks:[], assignedTasks:[], + // revokedConnectors:[], revokedTasks:[T0-3] + // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1] + // revokedConnectors:[] revokedTasks:[] + // W3: assignedTasks:[], assignedTasks:[T1-2, T1-3] + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2] + // W2: connectors:[C1], tasks:[T1-0, T1-1] + // W3: connectors:[], tasks:[T1-2, T1-3] + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null)); + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3"); + + // Forth assignment after revocations, and a forth worker joining + // + // assignment after this phase: + // W1: assignedTasks:[], assignedTasks:[], + // revokedConnectors:[], revokedTasks:[T0-2] + // W2: assignedTasks:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // W3: assignedTasks:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // W4: assignedTasks:[], assignedTasks:[T0-3] + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1] + // W2: connectors:[C1], tasks:[T1-0, T1-1] + // W3: connectors:[], tasks:[T1-2, T1-3] + // W4: connectors:[], tasks:[T0-3] + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + memberConfigs.put("worker4", new ExtendedWorkerState(leaderUrl, offset, null)); + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 1, 0, 1, "worker1", "worker2", "worker3", "worker4"); + + // Fifth assignment after revocations + // + // assignment after this phase: + // W1: assignedTasks:[], assignedTasks:[], + // revokedConnectors:[], revokedTasks:[] + // W2: assignedTasks:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // W3: assignedTasks:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // W4: assignedTasks:[], assignedTasks:[T0-2] + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1] + // W2: connectors:[C1], tasks:[T1-0, T1-1] + // W3: connectors:[], tasks:[T1-2, T1-3] + // W4: connectors:[], tasks:[T0-3, T0-2] + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 1, 0, 0, "worker1", "worker2", "worker3", "worker4"); Review comment: Covered by [KAFKA-13763](https://issues.apache.org/jira/browse/KAFKA-13763). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org