C0urante commented on code in PR #11974:
URL: https://github.com/apache/kafka/pull/11974#discussion_r842931708


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -76,23 +83,21 @@
 
     @Parameters
     public static Iterable<?> mode() {
-        return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1, 
CONNECT_PROTOCOL_V2}});
+        return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1}, 
{CONNECT_PROTOCOL_V2}});

Review Comment:
   Blegh, thanks. I'll simplify it anyways; rather leave `trunk` in a healthy 
place and not risk FUD from someone else copy+pasting this style.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -1170,78 +983,128 @@ public void 
testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted()
         
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
 
         // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
-        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-        ++rebalanceNum;
-        returnedAssignments = assignmentsCapture.getValue();
-        assertDelay(0, returnedAssignments);
-        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-        assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(2, 8, 0, 0, "worker1");
-
-        //delete connector1
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(2);
+        assertTaskAllocations(8);
+        assertBalancedAndCompleteAllocation();
+
+        // Delete connector1
         configState = clusterConfigState(offset, 2, 1, 4);
         when(coordinator.configSnapshot()).thenReturn(configState);
 
         // Second assignment with a second worker with duplicate assignment 
joining and the duplicated assignment is deleted at the same time
-        applyAssignments(returnedAssignments);
-        memberConfigs = memberConfigs(leader, offset, assignments);
-        ExtendedAssignment duplicatedWorkerAssignment = 
newExpandableAssignment();
-        duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2));
-        duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 
4));
-        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, duplicatedWorkerAssignment));
-        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-        ++rebalanceNum;
-        returnedAssignments = assignmentsCapture.getValue();
-        assertDelay(0, returnedAssignments);
-        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-        assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(0, 0, 2, 8, "worker1", "worker2");
+        addNewWorker("worker2", newConnectors(1, 2), newTasks("connector1", 0, 
4));
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 1);
+        assertTaskAllocations(0, 4);
 
         // Third assignment after revocations
-        applyAssignments(returnedAssignments);
-        memberConfigs = memberConfigs(leader, offset, assignments);
-        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-        ++rebalanceNum;
-        returnedAssignments = assignmentsCapture.getValue();
-        assertDelay(0, returnedAssignments);
-        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-        assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(0, 0, 0, 2, "worker1", "worker2");
+        performStandardRebalance();
+        assertDelay(0);
+        assertConnectorAllocations(0, 1);
+        assertTaskAllocations(0, 2);
 
         // fourth rebalance after revocations
-        applyAssignments(returnedAssignments);
-        memberConfigs = memberConfigs(leader, offset, assignments);
-        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
-        ++rebalanceNum;
-        returnedAssignments = assignmentsCapture.getValue();
-        assertDelay(0, returnedAssignments);
-        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-        assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(0, 2, 0, 0, "worker1", "worker2");
+        performStandardRebalance();
+        assertDelay(0);
+        assertConnectorAllocations(0, 1);
+        assertTaskAllocations(2, 2);
+        assertBalancedAndCompleteAllocation();
 
         // Fifth rebalance should not change assignments
-        applyAssignments(returnedAssignments);
-        memberConfigs = memberConfigs(leader, offset, assignments);
-        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        performStandardRebalance();
+        assertDelay(0);
+        assertEmptyAssignment();
+
+        verifyCoordinatorInteractions();
+    }
+
+    private void performStandardRebalance() {
+        performRebalance(false, false);
+    }
+
+    private void performFailedRebalance() {
+        performRebalance(true, false);
+    }
+
+    private void performRebalanceWithMismatchedGeneration() {
+        performRebalance(false, true);
+    }
+
+    private void performRebalance(boolean assignmentFailure, boolean 
expectGenerationMismatch) {
+        expectGeneration(expectGenerationMismatch);
+        // Member configs are tracked by the assignor; create a deep copy here 
so that modifications to our own memberConfigs field
+        // are not accidentally propagated to the one used by the assignor
+        Map<String, ExtendedWorkerState> memberConfigsCopy = 
memberConfigs.entrySet().stream().collect(Collectors.toMap(
+                Map.Entry::getKey,
+                e -> {
+                    ExtendedWorkerState originalWorkerState = e.getValue();
+                    return new ExtendedWorkerState(
+                            originalWorkerState.url(),
+                            originalWorkerState.offset(),
+                            duplicate(originalWorkerState.assignment())
+                    );
+                }
+        ));
+        try {
+            assignor.performTaskAssignment(leader, offset, memberConfigsCopy, 
coordinator, protocolVersion);
+        } catch (RuntimeException e) {
+            if (assignmentFailure) {
+                RequestFuture.failure(e);
+            } else {
+                throw e;
+            }
+        }
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
-        assertDelay(0, returnedAssignments);
-        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
-        assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+        assertNoRedundantAssignments();
+        if (!assignmentFailure) {
+            applyAssignments(leader, offset, returnedAssignments);
+        }
+    }
 
-        verify(coordinator, times(rebalanceNum)).configSnapshot();
-        verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(2 * rebalanceNum)).generationId();
-        verify(coordinator, times(rebalanceNum)).memberId();
-        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
+    private void expectGeneration(boolean expectMismatch) {
+        when(coordinator.generationId())
+                .thenReturn(assignor.previousGenerationId + 1)
+                .thenReturn(assignor.previousGenerationId + 1);

Review Comment:
   Ah, good call. I figured the [existing 
style](https://github.com/apache/kafka/blob/481cc13a132d33f23e737f88ae28a1aac135afed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1464-L1466)
 was necessary since `WorkerCoordinator::generationId` would be invoked twice 
for every call to `expectGeneration` but it looks like Mockito handles this 
just fine with a single `thenReturn`.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -769,16 +609,16 @@ public void testLostAssignmentHandlingWhenWorkerBounces() 
{
                 new ArrayList<>(configuredAssignment.values()),
                 memberConfigs);
 
-        assertThat("Wrong set of workers for reassignments",
+        assertEquals("Wrong set of workers for reassignments",

Review Comment:
   I usually prefer `assertEquals` when possible since it shows the difference 
between actual and expected, which in this case would mean the complete set of 
incorrect candidate workers for reassignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to