C0urante commented on code in PR #11974: URL: https://github.com/apache/kafka/pull/11974#discussion_r842931708
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -76,23 +83,21 @@ @Parameters public static Iterable<?> mode() { - return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2}}); + return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1}, {CONNECT_PROTOCOL_V2}}); Review Comment: Blegh, thanks. I'll simplify it anyways; rather leave `trunk` in a healthy place and not risk FUD from someone else copy+pasting this style. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -1170,78 +983,128 @@ public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 1 worker and 2 connectors configured but not yet assigned - assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); - ++rebalanceNum; - returnedAssignments = assignmentsCapture.getValue(); - assertDelay(0, returnedAssignments); - expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); - assertNoReassignments(memberConfigs, expectedMemberConfigs); - assertAssignment(2, 8, 0, 0, "worker1"); - - //delete connector1 + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1"); + assertConnectorAllocations(2); + assertTaskAllocations(8); + assertBalancedAndCompleteAllocation(); + + // Delete connector1 configState = clusterConfigState(offset, 2, 1, 4); when(coordinator.configSnapshot()).thenReturn(configState); // Second assignment with a second worker with duplicate assignment joining and the duplicated assignment is deleted at the same time - applyAssignments(returnedAssignments); - memberConfigs = memberConfigs(leader, offset, assignments); - ExtendedAssignment duplicatedWorkerAssignment = newExpandableAssignment(); - duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2)); - duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 4)); - memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, duplicatedWorkerAssignment)); - assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); - ++rebalanceNum; - returnedAssignments = assignmentsCapture.getValue(); - assertDelay(0, returnedAssignments); - expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); - assertNoReassignments(memberConfigs, expectedMemberConfigs); - assertAssignment(0, 0, 2, 8, "worker1", "worker2"); + addNewWorker("worker2", newConnectors(1, 2), newTasks("connector1", 0, 4)); + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(0, 1); + assertTaskAllocations(0, 4); // Third assignment after revocations - applyAssignments(returnedAssignments); - memberConfigs = memberConfigs(leader, offset, assignments); - assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); - ++rebalanceNum; - returnedAssignments = assignmentsCapture.getValue(); - assertDelay(0, returnedAssignments); - expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); - assertNoReassignments(memberConfigs, expectedMemberConfigs); - assertAssignment(0, 0, 0, 2, "worker1", "worker2"); + performStandardRebalance(); + assertDelay(0); + assertConnectorAllocations(0, 1); + assertTaskAllocations(0, 2); // fourth rebalance after revocations - applyAssignments(returnedAssignments); - memberConfigs = memberConfigs(leader, offset, assignments); - assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); - ++rebalanceNum; - returnedAssignments = assignmentsCapture.getValue(); - assertDelay(0, returnedAssignments); - expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); - assertNoReassignments(memberConfigs, expectedMemberConfigs); - assertAssignment(0, 2, 0, 0, "worker1", "worker2"); + performStandardRebalance(); + assertDelay(0); + assertConnectorAllocations(0, 1); + assertTaskAllocations(2, 2); + assertBalancedAndCompleteAllocation(); // Fifth rebalance should not change assignments - applyAssignments(returnedAssignments); - memberConfigs = memberConfigs(leader, offset, assignments); - assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + performStandardRebalance(); + assertDelay(0); + assertEmptyAssignment(); + + verifyCoordinatorInteractions(); + } + + private void performStandardRebalance() { + performRebalance(false, false); + } + + private void performFailedRebalance() { + performRebalance(true, false); + } + + private void performRebalanceWithMismatchedGeneration() { + performRebalance(false, true); + } + + private void performRebalance(boolean assignmentFailure, boolean expectGenerationMismatch) { + expectGeneration(expectGenerationMismatch); + // Member configs are tracked by the assignor; create a deep copy here so that modifications to our own memberConfigs field + // are not accidentally propagated to the one used by the assignor + Map<String, ExtendedWorkerState> memberConfigsCopy = memberConfigs.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + e -> { + ExtendedWorkerState originalWorkerState = e.getValue(); + return new ExtendedWorkerState( + originalWorkerState.url(), + originalWorkerState.offset(), + duplicate(originalWorkerState.assignment()) + ); + } + )); + try { + assignor.performTaskAssignment(leader, offset, memberConfigsCopy, coordinator, protocolVersion); + } catch (RuntimeException e) { + if (assignmentFailure) { + RequestFuture.failure(e); + } else { + throw e; + } + } ++rebalanceNum; returnedAssignments = assignmentsCapture.getValue(); - assertDelay(0, returnedAssignments); - expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); - assertNoReassignments(memberConfigs, expectedMemberConfigs); - assertAssignment(0, 0, 0, 0, "worker1", "worker2"); + assertNoRedundantAssignments(); + if (!assignmentFailure) { + applyAssignments(leader, offset, returnedAssignments); + } + } - verify(coordinator, times(rebalanceNum)).configSnapshot(); - verify(coordinator, times(rebalanceNum)).leaderState(any()); - verify(coordinator, times(2 * rebalanceNum)).generationId(); - verify(coordinator, times(rebalanceNum)).memberId(); - verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId(); + private void expectGeneration(boolean expectMismatch) { + when(coordinator.generationId()) + .thenReturn(assignor.previousGenerationId + 1) + .thenReturn(assignor.previousGenerationId + 1); Review Comment: Ah, good call. I figured the [existing style](https://github.com/apache/kafka/blob/481cc13a132d33f23e737f88ae28a1aac135afed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1464-L1466) was necessary since `WorkerCoordinator::generationId` would be invoked twice for every call to `expectGeneration` but it looks like Mockito handles this just fine with a single `thenReturn`. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -769,16 +609,16 @@ public void testLostAssignmentHandlingWhenWorkerBounces() { new ArrayList<>(configuredAssignment.values()), memberConfigs); - assertThat("Wrong set of workers for reassignments", + assertEquals("Wrong set of workers for reassignments", Review Comment: I usually prefer `assertEquals` when possible since it shows the difference between actual and expected, which in this case would mean the complete set of incorrect candidate workers for reassignment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org