C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r861366880


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -44,93 +35,49 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
-import static 
org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
-import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
-import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment;
 import static 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import static org.apache.kafka.connect.util.ConnectUtils.transformValues;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.runners.Parameterized.Parameter;
-import static org.junit.runners.Parameterized.Parameters;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-@RunWith(Parameterized.class)
-public class IncrementalCooperativeAssignorTest {
-    @Rule
-    public MockitoRule rule = MockitoJUnit.rule();
-
-    @Mock
-    private WorkerCoordinator coordinator;
-
-    @Captor
-    ArgumentCaptor<Map<String, ExtendedAssignment>> assignmentsCapture;
-
-    @Parameters
-    public static Iterable<?> mode() {
-        return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2);
-    }

Review Comment:
   Turns out the logic there was both unnecessary (it's guaranteed that all 
metadata will have the same protocol) and incorrect (the serialization logic 
never propagated the correct protocol version).
   
   I've pushed a patch and a couple of tests to fix this; LMKWYT.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -1375,24 +1171,6 @@ private void assertCompleteAllocation() {
         });
     }
 
-    private void verifyCoordinatorInteractions() {
-        verify(coordinator, times(rebalanceNum)).configSnapshot();
-        verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(2 * rebalanceNum)).generationId();
-        verify(coordinator, times(rebalanceNum)).memberId();
-        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
-    }

Review Comment:
   Ack, done. The new test does nothing to verify the content of the leader 
state, but it does verify that `WorkerCoordinator::leaderState` is invoked, 
which is the same coverage that the current tests provide.
   
   I've also augmented the existing test logic to check the content of the 
`ClusterAssignment` (specifically, the return value of the 
`allAssignedConnectors` and `allAssignedTasks` methods for each worker in the 
cluster) in `applyAssignments`. As long as we use the `ClusterAssignment` 
object correctly (and it's fairly trivial to verify that we do) by passing its 
state to the coordinator, then this should ensure that the proper leader state 
is recorded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to