C0urante commented on a change in pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#discussion_r833730613



##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
##########
@@ -180,6 +208,155 @@ public void testTaskAssignmentWhenWorkerJoins() {
         verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
+    @Test
+    public void testTaskAssignmentWhenWorkerJoinAfterRevocation() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        //
+        // note: the assigned/revoked Connectors/tasks might be different, but 
the amount should be the same
+        // assignment after this phase:
+        // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, 
T0-3, T1-0, T1-1, T1-2, T1-3],
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, 
T1-2, T1-3]
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        //
+        // assignment after this phase:
+        // W1: assignedConnectors:[], assignedTasks:[],
+        //     revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3]
+        // W2: assignedConnectors:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3]
+        // W2: connectors:[], tasks:[]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+        // Third assignment after revocations, and a third worker joining
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[T0-3]
+        // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[T1-2, T1-3]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2]
+        // W2: connectors:[C1], tasks:[T1-0, T1-1]
+        // W3: connectors:[], tasks:[T1-2, T1-3]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3");
+
+        // Forth assignment after revocations, and a forth worker joining
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[T0-2]
+        // W2: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W4: assignedTasks:[], assignedTasks:[T0-3]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1]
+        // W2: connectors:[C1], tasks:[T1-0, T1-1]
+        // W3: connectors:[], tasks:[T1-2, T1-3]
+        // W4: connectors:[], tasks:[T0-3]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker4", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 1, 0, 1, "worker1", "worker2", "worker3", 
"worker4");
+
+        // Fifth assignment after revocations
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[]
+        // W2: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W4: assignedTasks:[], assignedTasks:[T0-2]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1]
+        // W2: connectors:[C1], tasks:[T1-0, T1-1]
+        // W3: connectors:[], tasks:[T1-2, T1-3]
+        // W4: connectors:[], tasks:[T0-3, T0-2]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 1, 0, 0, "worker1", "worker2", "worker3", 
"worker4");

Review comment:
       Covered by 
[KAFKA-13763](https://issues.apache.org/jira/browse/KAFKA-13763).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to