C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r857610529
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -1375,24 +1171,6 @@ private void assertCompleteAllocation() { }); } - private void verifyCoordinatorInteractions() { - verify(coordinator, times(rebalanceNum)).configSnapshot(); - verify(coordinator, times(rebalanceNum)).leaderState(any()); - verify(coordinator, times(2 * rebalanceNum)).generationId(); - verify(coordinator, times(rebalanceNum)).memberId(); - verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId(); - } Review Comment: Not at the moment, no. The existing tests don't give us that much anyways: - It doesn't matter at all how many times we request the current config snapshot, generation ID, member ID, or last completed generation ID; these methods all have trivial implementations - Although it does matter that we invoke `Coordinator::leaderState` during the rebalance, the existing tests only verify that we invoke it an expected number of times, but without any assertions about what the actual state given to the coordinator is ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -44,93 +35,49 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; -import static org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate; -import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; -import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; +import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; +import static org.apache.kafka.connect.util.ConnectUtils.transformValues; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.runners.Parameterized.Parameter; -import static org.junit.runners.Parameterized.Parameters; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -@RunWith(Parameterized.class) -public class IncrementalCooperativeAssignorTest { - @Rule - public MockitoRule rule = MockitoJUnit.rule(); - - @Mock - private WorkerCoordinator coordinator; - - @Captor - ArgumentCaptor<Map<String, ExtendedAssignment>> assignmentsCapture; - - @Parameters - public static Iterable<?> mode() { - return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2); - } Review Comment: Yeah, the existing tests provide basically no guarantees for these cases, and the actual allocation logic (which is 95% of what should be tested for this class) doesn't vary at all based on the protocol version, which can be verified by the lack of a `protocolVersion` argument in the visible-for-testing `performTaskAssignment` method. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ########## @@ -745,22 +779,108 @@ protected void assignTasks(List<WorkerLoad> workerAssignment, Collection<Connect } } - private static List<WorkerLoad> workerAssignment(Map<String, ExtendedWorkerState> memberConfigs, + private static List<WorkerLoad> workerAssignment(Map<String, ConnectorsAndTasks> memberAssignments, ConnectorsAndTasks toExclude) { ConnectorsAndTasks ignore = new ConnectorsAndTasks.Builder() .with(new HashSet<>(toExclude.connectors()), new HashSet<>(toExclude.tasks())) .build(); - return memberConfigs.entrySet().stream() + return memberAssignments.entrySet().stream() .map(e -> new WorkerLoad.Builder(e.getKey()).with( - e.getValue().assignment().connectors().stream() + e.getValue().connectors().stream() .filter(v -> !ignore.connectors().contains(v)) .collect(Collectors.toList()), - e.getValue().assignment().tasks().stream() + e.getValue().tasks().stream() .filter(v -> !ignore.tasks().contains(v)) .collect(Collectors.toList()) ).build() ).collect(Collectors.toList()); } + static class ClusterAssignment { Review Comment: 👍 SGTM, if nothing else it can make debugging easier. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ########## @@ -298,20 +329,21 @@ protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long ma Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments = diff(taskAssignments, currentTaskAssignments); + previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments); + previousGenerationId = currentGenerationId; + previousMembers = memberAssignments.keySet(); + log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments); log.debug("Incremental task assignments: {}", incrementalTaskAssignments); - coordinator.leaderState(new LeaderState(memberConfigs, connectorAssignments, taskAssignments)); Review Comment: Sure! Working backwards from the existing code base at this line, we see the call to `coordinator::LeaderState` with the arguments `connectorAssignments` and `taskAssignments`: https://github.com/apache/kafka/blob/2b64f1a57160884754f0a3652c0bd34de21e9ad1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L304 These two arguments are defined earlier on in `performTaskAssignment`: https://github.com/apache/kafka/blob/2b64f1a57160884754f0a3652c0bd34de21e9ad1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L251-L256 In this PR's branch, we see that the `ClusterAssignment` returned from `performTaskAssignment` uses the same `connectorAssignments` and `taskAssignments` as its `allConnectorAssignments` and `allTaskAssignments` fields: https://github.com/C0urante/kafka/blob/b45f73f572ef65b2dbd7c2b73a53a4fb1927f6a4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L344-L345 And these two arguments are defined in the same way and the same place in `performTaskAssignment`: https://github.com/C0urante/kafka/blob/b45f73f572ef65b2dbd7c2b73a53a4fb1927f6a4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L282-L287 Does that help? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -1142,107 +982,65 @@ private void removeConnector(String connector) { ); } - private void updateConfigSnapshot() { - when(coordinator.configSnapshot()).thenReturn(configState()); - } - private ClusterConfigState configState() { Map<String, Integer> taskCounts = new HashMap<>(connectors); - Map<String, Map<String, String>> connectorConfigs = taskCounts.keySet().stream().collect(Collectors.toMap( - Function.identity(), - connector -> Collections.emptyMap() - )); - Map<String, TargetState> targetStates = taskCounts.keySet().stream().collect(Collectors.toMap( - Function.identity(), - connector -> TargetState.STARTED - )); + Map<String, Map<String, String>> connectorConfigs = transformValues(taskCounts, c -> Collections.emptyMap()); + Map<String, TargetState> targetStates = transformValues(taskCounts, c -> TargetState.STARTED); Map<ConnectorTaskId, Map<String, String>> taskConfigs = taskCounts.entrySet().stream() .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> new ConnectorTaskId(e.getKey(), i))) .collect(Collectors.toMap( Function.identity(), connectorTaskId -> Collections.emptyMap() )); return new ClusterConfigState( - offset, + 16, Review Comment: It's just an arbitrary number. Could isolate it in a class constant named something like `CONFIG_OFFSET`, or add a comment stating that the offset isn't relevant in the existing tests. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org