C0urante commented on code in PR #11974: URL: https://github.com/apache/kafka/pull/11974#discussion_r842022697
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -1275,89 +1136,68 @@ private static ClusterConfigState clusterConfigState(long offset, int connectorNum, int taskNum) { int connectorNumEnd = connectorStart + connectorNum - 1; + + Map<String, Integer> connectorTaskCounts = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> taskNum); + Map<String, Map<String, String>> connectorConfigs = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, HashMap::new); + Map<String, TargetState> connectorTargetStates = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> TargetState.STARTED); + Map<ConnectorTaskId, Map<String, String>> taskConfigs = fillMap( + 0, + connectorNum * taskNum, + i -> new ConnectorTaskId("connector" + i / connectorNum + 1, i), + HashMap::new + ); + return new ClusterConfigState( offset, null, - connectorTaskCounts(connectorStart, connectorNumEnd, taskNum), - connectorConfigs(connectorStart, connectorNumEnd), - connectorTargetStates(connectorStart, connectorNumEnd, TargetState.STARTED), - taskConfigs(0, connectorNum, connectorNum * taskNum), + connectorTaskCounts, + connectorConfigs, + connectorTargetStates, + taskConfigs, Collections.emptySet()); } - private static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, - long givenOffset, - Map<String, ExtendedAssignment> givenAssignments) { - return givenAssignments.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, e.getValue()))); - } - - private static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, + private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, long givenOffset, int start, int connectorNum) { - return IntStream.range(start, connectorNum + 1) - .mapToObj(i -> new SimpleEntry<>("worker" + i, new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, null))) - .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); - } - - private static Map<String, Integer> connectorTaskCounts(int start, - int connectorNum, - int taskCounts) { - return IntStream.range(start, connectorNum + 1) - .mapToObj(i -> new SimpleEntry<>("connector" + i, taskCounts)) - .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); - } - - private static Map<String, Map<String, String>> connectorConfigs(int start, int connectorNum) { - return IntStream.range(start, connectorNum + 1) - .mapToObj(i -> new SimpleEntry<>("connector" + i, new HashMap<String, String>())) - .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); - } - - private static Map<String, TargetState> connectorTargetStates(int start, - int connectorNum, - TargetState state) { - return IntStream.range(start, connectorNum + 1) - .mapToObj(i -> new SimpleEntry<>("connector" + i, state)) - .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); + return fillMap( Review Comment: Agree that it's more readable, and in some ways it's actually more flexible since you can now specify an arbitrary set of worker names instead of having them generated for you. Done 👍 ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -1275,89 +1136,68 @@ private static ClusterConfigState clusterConfigState(long offset, int connectorNum, int taskNum) { int connectorNumEnd = connectorStart + connectorNum - 1; + + Map<String, Integer> connectorTaskCounts = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> taskNum); + Map<String, Map<String, String>> connectorConfigs = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, HashMap::new); + Map<String, TargetState> connectorTargetStates = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> TargetState.STARTED); + Map<ConnectorTaskId, Map<String, String>> taskConfigs = fillMap( + 0, + connectorNum * taskNum, + i -> new ConnectorTaskId("connector" + i / connectorNum + 1, i), + HashMap::new + ); + return new ClusterConfigState( offset, null, - connectorTaskCounts(connectorStart, connectorNumEnd, taskNum), - connectorConfigs(connectorStart, connectorNumEnd), - connectorTargetStates(connectorStart, connectorNumEnd, TargetState.STARTED), - taskConfigs(0, connectorNum, connectorNum * taskNum), + connectorTaskCounts, + connectorConfigs, + connectorTargetStates, + taskConfigs, Collections.emptySet()); } - private static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, - long givenOffset, - Map<String, ExtendedAssignment> givenAssignments) { - return givenAssignments.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, e.getValue()))); - } - - private static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, + private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, long givenOffset, int start, int connectorNum) { Review Comment: Yep, good catch 👍 ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -34,25 +34,30 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; -import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.runners.Parameterized.Parameter; Review Comment: Ah, good catch! I've corrected this in the test, although it's worth noting that in https://github.com/apache/kafka/pull/11983 I'm proposing that we remove the `protocolVersion` parameter entirely since it's not accomplishing a whole lot here. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -34,25 +34,30 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; -import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.is; Review Comment: Good call, done 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org