C0urante commented on code in PR #11974:
URL: https://github.com/apache/kafka/pull/11974#discussion_r842022697


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -1275,89 +1136,68 @@ private static ClusterConfigState 
clusterConfigState(long offset,
                                                          int connectorNum,
                                                          int taskNum) {
         int connectorNumEnd = connectorStart + connectorNum - 1;
+
+        Map<String, Integer> connectorTaskCounts = fillMap(connectorStart, 
connectorNumEnd, i -> "connector" + i, () -> taskNum);
+        Map<String, Map<String, String>> connectorConfigs = 
fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, HashMap::new);
+        Map<String, TargetState> connectorTargetStates = 
fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> 
TargetState.STARTED);
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = fillMap(
+                0,
+                connectorNum * taskNum,
+                i -> new ConnectorTaskId("connector" + i / connectorNum + 1, 
i),
+                HashMap::new
+        );
+
         return new ClusterConfigState(
                 offset,
                 null,
-                connectorTaskCounts(connectorStart, connectorNumEnd, taskNum),
-                connectorConfigs(connectorStart, connectorNumEnd),
-                connectorTargetStates(connectorStart, connectorNumEnd, 
TargetState.STARTED),
-                taskConfigs(0, connectorNum, connectorNum * taskNum),
+                connectorTaskCounts,
+                connectorConfigs,
+                connectorTargetStates,
+                taskConfigs,
                 Collections.emptySet());
     }
 
-    private static Map<String, ExtendedWorkerState> memberConfigs(String 
givenLeader,
-                                                                  long 
givenOffset,
-                                                                  Map<String, 
ExtendedAssignment> givenAssignments) {
-        return givenAssignments.entrySet().stream()
-                .collect(Collectors.toMap(
-                    Map.Entry::getKey,
-                    e -> new 
ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, 
e.getValue())));
-    }
-
-    private static Map<String, ExtendedWorkerState> memberConfigs(String 
givenLeader,
+    private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader,
                                                                   long 
givenOffset,
                                                                   int start,
                                                                   int 
connectorNum) {
-        return IntStream.range(start, connectorNum + 1)
-                .mapToObj(i -> new SimpleEntry<>("worker" + i, new 
ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, null)))
-                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue));
-    }
-
-    private static Map<String, Integer> connectorTaskCounts(int start,
-                                                            int connectorNum,
-                                                            int taskCounts) {
-        return IntStream.range(start, connectorNum + 1)
-                .mapToObj(i -> new SimpleEntry<>("connector" + i, taskCounts))
-                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue));
-    }
-
-    private static Map<String, Map<String, String>> connectorConfigs(int 
start, int connectorNum) {
-        return IntStream.range(start, connectorNum + 1)
-                .mapToObj(i -> new SimpleEntry<>("connector" + i, new 
HashMap<String, String>()))
-                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue));
-    }
-
-    private static Map<String, TargetState> connectorTargetStates(int start,
-                                                                  int 
connectorNum,
-                                                                  TargetState 
state) {
-        return IntStream.range(start, connectorNum + 1)
-                .mapToObj(i -> new SimpleEntry<>("connector" + i, state))
-                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue));
+        return fillMap(

Review Comment:
   Agree that it's more readable, and in some ways it's actually more flexible 
since you can now specify an arbitrary set of worker names instead of having 
them generated for you. Done 👍 



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -1275,89 +1136,68 @@ private static ClusterConfigState 
clusterConfigState(long offset,
                                                          int connectorNum,
                                                          int taskNum) {
         int connectorNumEnd = connectorStart + connectorNum - 1;
+
+        Map<String, Integer> connectorTaskCounts = fillMap(connectorStart, 
connectorNumEnd, i -> "connector" + i, () -> taskNum);
+        Map<String, Map<String, String>> connectorConfigs = 
fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, HashMap::new);
+        Map<String, TargetState> connectorTargetStates = 
fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> 
TargetState.STARTED);
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = fillMap(
+                0,
+                connectorNum * taskNum,
+                i -> new ConnectorTaskId("connector" + i / connectorNum + 1, 
i),
+                HashMap::new
+        );
+
         return new ClusterConfigState(
                 offset,
                 null,
-                connectorTaskCounts(connectorStart, connectorNumEnd, taskNum),
-                connectorConfigs(connectorStart, connectorNumEnd),
-                connectorTargetStates(connectorStart, connectorNumEnd, 
TargetState.STARTED),
-                taskConfigs(0, connectorNum, connectorNum * taskNum),
+                connectorTaskCounts,
+                connectorConfigs,
+                connectorTargetStates,
+                taskConfigs,
                 Collections.emptySet());
     }
 
-    private static Map<String, ExtendedWorkerState> memberConfigs(String 
givenLeader,
-                                                                  long 
givenOffset,
-                                                                  Map<String, 
ExtendedAssignment> givenAssignments) {
-        return givenAssignments.entrySet().stream()
-                .collect(Collectors.toMap(
-                    Map.Entry::getKey,
-                    e -> new 
ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, 
e.getValue())));
-    }
-
-    private static Map<String, ExtendedWorkerState> memberConfigs(String 
givenLeader,
+    private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader,
                                                                   long 
givenOffset,
                                                                   int start,
                                                                   int 
connectorNum) {

Review Comment:
   Yep, good catch 👍 



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -34,25 +34,30 @@
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
-import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
 import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
 import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
 import static 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
-import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.runners.Parameterized.Parameter;

Review Comment:
   Ah, good catch! I've corrected this in the test, although it's worth noting 
that in https://github.com/apache/kafka/pull/11983 I'm proposing that we remove 
the `protocolVersion` parameter entirely since it's not accomplishing a whole 
lot here.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -34,25 +34,30 @@
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
-import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
 import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
 import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
 import static 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
-import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.CoreMatchers.is;

Review Comment:
   Good call, done 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to