C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r857610529


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -1375,24 +1171,6 @@ private void assertCompleteAllocation() {
         });
     }
 
-    private void verifyCoordinatorInteractions() {
-        verify(coordinator, times(rebalanceNum)).configSnapshot();
-        verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(2 * rebalanceNum)).generationId();
-        verify(coordinator, times(rebalanceNum)).memberId();
-        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
-    }

Review Comment:
   Not at the moment, no. The existing tests don't give us that much anyways:
   
   - It doesn't matter at all how many times we request the current config 
snapshot, generation ID, member ID, or last completed generation ID; these 
methods all have trivial implementations
   - Although it does matter that we invoke `Coordinator::leaderState` during 
the rebalance, the existing tests only verify that we invoke it an expected 
number of times, but without any assertions about what the actual state given 
to the coordinator is



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -44,93 +35,49 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
-import static 
org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
-import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
-import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment;
 import static 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import static org.apache.kafka.connect.util.ConnectUtils.transformValues;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.runners.Parameterized.Parameter;
-import static org.junit.runners.Parameterized.Parameters;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-@RunWith(Parameterized.class)
-public class IncrementalCooperativeAssignorTest {
-    @Rule
-    public MockitoRule rule = MockitoJUnit.rule();
-
-    @Mock
-    private WorkerCoordinator coordinator;
-
-    @Captor
-    ArgumentCaptor<Map<String, ExtendedAssignment>> assignmentsCapture;
-
-    @Parameters
-    public static Iterable<?> mode() {
-        return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2);
-    }

Review Comment:
   Yeah, the existing tests provide basically no guarantees for these cases, 
and the actual allocation logic (which is 95% of what should be tested for this 
class) doesn't vary at all based on the protocol version, which can be verified 
by the lack of a `protocolVersion` argument in the visible-for-testing 
`performTaskAssignment` method.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -745,22 +779,108 @@ protected void assignTasks(List<WorkerLoad> 
workerAssignment, Collection<Connect
         }
     }
 
-    private static List<WorkerLoad> workerAssignment(Map<String, 
ExtendedWorkerState> memberConfigs,
+    private static List<WorkerLoad> workerAssignment(Map<String, 
ConnectorsAndTasks> memberAssignments,
                                                      ConnectorsAndTasks 
toExclude) {
         ConnectorsAndTasks ignore = new ConnectorsAndTasks.Builder()
                 .with(new HashSet<>(toExclude.connectors()), new 
HashSet<>(toExclude.tasks()))
                 .build();
 
-        return memberConfigs.entrySet().stream()
+        return memberAssignments.entrySet().stream()
                 .map(e -> new WorkerLoad.Builder(e.getKey()).with(
-                        e.getValue().assignment().connectors().stream()
+                        e.getValue().connectors().stream()
                                 .filter(v -> !ignore.connectors().contains(v))
                                 .collect(Collectors.toList()),
-                        e.getValue().assignment().tasks().stream()
+                        e.getValue().tasks().stream()
                                 .filter(v -> !ignore.tasks().contains(v))
                                 .collect(Collectors.toList())
                         ).build()
                 ).collect(Collectors.toList());
     }
 
+    static class ClusterAssignment {

Review Comment:
   👍 SGTM, if nothing else it can make debugging easier.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -298,20 +329,21 @@ protected Map<String, ByteBuffer> 
performTaskAssignment(String leaderId, long ma
         Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
                 diff(taskAssignments, currentTaskAssignments);
 
+        previousAssignment = computePreviousAssignment(toRevoke, 
connectorAssignments, taskAssignments, lostAssignments);
+        previousGenerationId = currentGenerationId;
+        previousMembers = memberAssignments.keySet();
+
         log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
         log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
 
-        coordinator.leaderState(new LeaderState(memberConfigs, 
connectorAssignments, taskAssignments));

Review Comment:
   Sure!
   
   Working backwards from the existing code base at this line, we see the call 
to `coordinator::LeaderState` with the arguments `connectorAssignments` and 
`taskAssignments`: 
https://github.com/apache/kafka/blob/2b64f1a57160884754f0a3652c0bd34de21e9ad1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L304
   
   These two arguments are defined earlier on in `performTaskAssignment`: 
https://github.com/apache/kafka/blob/2b64f1a57160884754f0a3652c0bd34de21e9ad1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L251-L256
   
   
   In this PR's branch, we see that the `ClusterAssignment` returned from 
`performTaskAssignment` uses the same `connectorAssignments` and 
`taskAssignments` as its `allConnectorAssignments` and `allTaskAssignments` 
fields: 
https://github.com/C0urante/kafka/blob/b45f73f572ef65b2dbd7c2b73a53a4fb1927f6a4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L344-L345
   
   And these two arguments are defined in the same way and the same place in 
`performTaskAssignment`: 
https://github.com/C0urante/kafka/blob/b45f73f572ef65b2dbd7c2b73a53a4fb1927f6a4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L282-L287
   
   
   Does that help?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -1142,107 +982,65 @@ private void removeConnector(String connector) {
         );
     }
 
-    private void updateConfigSnapshot() {
-        when(coordinator.configSnapshot()).thenReturn(configState());
-    }
-
     private ClusterConfigState configState() {
         Map<String, Integer> taskCounts = new HashMap<>(connectors);
-        Map<String, Map<String, String>> connectorConfigs = 
taskCounts.keySet().stream().collect(Collectors.toMap(
-                Function.identity(),
-                connector -> Collections.emptyMap()
-        ));
-        Map<String, TargetState> targetStates = 
taskCounts.keySet().stream().collect(Collectors.toMap(
-                Function.identity(),
-                connector -> TargetState.STARTED
-        ));
+        Map<String, Map<String, String>> connectorConfigs = 
transformValues(taskCounts, c -> Collections.emptyMap());
+        Map<String, TargetState> targetStates = transformValues(taskCounts, c 
-> TargetState.STARTED);
         Map<ConnectorTaskId, Map<String, String>> taskConfigs = 
taskCounts.entrySet().stream()
                 .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> 
new ConnectorTaskId(e.getKey(), i)))
                 .collect(Collectors.toMap(
                         Function.identity(),
                         connectorTaskId -> Collections.emptyMap()
                 ));
         return new ClusterConfigState(
-                offset,
+                16,

Review Comment:
   It's just an arbitrary number. Could isolate it in a class constant named 
something like `CONFIG_OFFSET`, or add a comment stating that the offset isn't 
relevant in the existing tests. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to