C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r861366880
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -44,93 +35,49 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; -import static org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate; -import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; -import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; +import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; +import static org.apache.kafka.connect.util.ConnectUtils.transformValues; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.runners.Parameterized.Parameter; -import static org.junit.runners.Parameterized.Parameters; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -@RunWith(Parameterized.class) -public class IncrementalCooperativeAssignorTest { - @Rule - public MockitoRule rule = MockitoJUnit.rule(); - - @Mock - private WorkerCoordinator coordinator; - - @Captor - ArgumentCaptor<Map<String, ExtendedAssignment>> assignmentsCapture; - - @Parameters - public static Iterable<?> mode() { - return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2); - } Review Comment: Turns out the logic there was both unnecessary (it's guaranteed that all metadata will have the same protocol) and incorrect (the serialization logic never propagated the correct protocol version). I've pushed a patch and a couple of tests to fix this; LMKWYT. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -1375,24 +1171,6 @@ private void assertCompleteAllocation() { }); } - private void verifyCoordinatorInteractions() { - verify(coordinator, times(rebalanceNum)).configSnapshot(); - verify(coordinator, times(rebalanceNum)).leaderState(any()); - verify(coordinator, times(2 * rebalanceNum)).generationId(); - verify(coordinator, times(rebalanceNum)).memberId(); - verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId(); - } Review Comment: Ack, done. The new test does nothing to verify the content of the leader state, but it does verify that `WorkerCoordinator::leaderState` is invoked, which is the same coverage that the current tests provide. I've also augmented the existing test logic to check the content of the `ClusterAssignment` (specifically, the return value of the `allAssignedConnectors` and `allAssignedTasks` methods for each worker in the cluster) in `applyAssignments`. As long as we use the `ClusterAssignment` object correctly (and it's fairly trivial to verify that we do) by passing its state to the coordinator, then this should ensure that the proper leader state is recorded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org