[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-08 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r867521081


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java:
##
@@ -148,18 +148,18 @@ public class IncrementalCooperativeConnectProtocol {
  *   Current Assignment => [Byte]
  * 
  */
-public static ByteBuffer serializeMetadata(ExtendedWorkerState 
workerState, boolean sessioned) {
+public static ByteBuffer serializeMetadata(ExtendedWorkerState 
workerState) {

Review Comment:
   No worries, done! Sorry for the delay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-03 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863740301


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java:
##
@@ -148,18 +148,18 @@ public class IncrementalCooperativeConnectProtocol {
  *   Current Assignment => [Byte]
  * 
  */
-public static ByteBuffer serializeMetadata(ExtendedWorkerState 
workerState, boolean sessioned) {
+public static ByteBuffer serializeMetadata(ExtendedWorkerState 
workerState) {

Review Comment:
   Ah, good catch! I've pushed a fix but tried to keep the improvement to the 
serialization API; if you'd prefer to go back to the `sessioned` boolean LMK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863330742


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java:
##
@@ -51,95 +41,58 @@ public class ConnectProtocolCompatibilityTest {
 private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
 private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
 
-@Rule
-public MockitoRule rule = MockitoJUnit.rule();
-
-@Mock
-private KafkaConfigBackingStore configStorage;
-private ClusterConfigState configState;
-
-@Before
-public void setup() {
-configStorage = mock(KafkaConfigBackingStore.class);
-configState = new ClusterConfigState(
-1L,
-null,
-Collections.singletonMap(connectorId1, 1),
-Collections.singletonMap(connectorId1, new HashMap<>()),
-Collections.singletonMap(connectorId1, TargetState.STARTED),
-Collections.singletonMap(taskId1x0, new HashMap<>()),
-Collections.emptySet());
-}
-
-@After
-public void teardown() {
-verifyNoMoreInteractions(configStorage);
-}

Review Comment:
   All of this is completely unnecessary and can be removed without diminishing 
testing coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863330276


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment(
 log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
 log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
 
+Map> revokedConnectors = 
transformValues(toRevoke, ConnectorsAndTasks::connectors);
+Map> revokedTasks = 
transformValues(toRevoke, ConnectorsAndTasks::tasks);
+
 return new ClusterAssignment(
 incrementalConnectorAssignments,
 incrementalTaskAssignments,
-transformValues(toRevoke, ConnectorsAndTasks::connectors),
-transformValues(toRevoke, ConnectorsAndTasks::tasks),
-connectorAssignments,
-taskAssignments
+revokedConnectors,
+revokedTasks,
+diff(connectorAssignments, revokedConnectors),
+diff(taskAssignments, revokedTasks)

Review Comment:
   I don't think so; it looks like we compute load-balancing revocations later 
on, around line 300. At line 279, the `completeWorkerAssignment` that we derive 
`connectorAssignments` and `taskAssignments` from only has the `deleted` 
connectors and tasks removed from it; everything else is still included.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863330276


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment(
 log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
 log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
 
+Map> revokedConnectors = 
transformValues(toRevoke, ConnectorsAndTasks::connectors);
+Map> revokedTasks = 
transformValues(toRevoke, ConnectorsAndTasks::tasks);
+
 return new ClusterAssignment(
 incrementalConnectorAssignments,
 incrementalTaskAssignments,
-transformValues(toRevoke, ConnectorsAndTasks::connectors),
-transformValues(toRevoke, ConnectorsAndTasks::tasks),
-connectorAssignments,
-taskAssignments
+revokedConnectors,
+revokedTasks,
+diff(connectorAssignments, revokedConnectors),
+diff(taskAssignments, revokedTasks)

Review Comment:
   I don't think so; it looks like we compute load-balancing revocations later 
on, around line 300. At line 279, the `completeWorkerAssignment` that we derive 
`connectorAssignments` and `taskAssignments` from only has the `deleted` 
connectors and tasks removed from it; everything else (including load-balancing 
revocations and duplicated assignments) is still included.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863329552


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java:
##
@@ -230,15 +230,16 @@ public static ExtendedWorkerState 
deserializeMetadata(ByteBuffer buffer) {
  *   ScheduledDelay => Int32
  * 
  */
-public static ByteBuffer serializeAssignment(ExtendedAssignment 
assignment) {
+public static ByteBuffer serializeAssignment(ExtendedAssignment 
assignment, boolean sessioned) {
 // comparison depends on reference equality for now
 if (assignment == null || 
ExtendedAssignment.empty().equals(assignment)) {
 return null;
 }
 Struct struct = assignment.toStruct();
-ByteBuffer buffer = 
ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf()
+Struct protocolHeader = sessioned ? CONNECT_PROTOCOL_HEADER_V2 : 
CONNECT_PROTOCOL_HEADER_V1;
+ByteBuffer buffer = ByteBuffer.allocate(protocolHeader.sizeOf()
 + 
ASSIGNMENT_V1.sizeOf(struct));
-CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer);
+protocolHeader.writeTo(buffer);

Review Comment:
   Ah yeah, good call! Much cleaner than what we had before. It was a little 
more involved than I initially thought to make this change but IMO the end 
result is cleaner and easier to read, so hopefully it's worth the inflation in 
the diff here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863308185


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -108,18 +107,15 @@ public Map performAssignment(String 
leaderId, String protoco
 log.debug("Max config offset root: {}, local snapshot config offsets 
root: {}",
   maxOffset, coordinator.configSnapshot().offset());
 
-short protocolVersion = memberConfigs.values().stream()
-.allMatch(state -> state.assignment().version() == 
CONNECT_PROTOCOL_V2)
-? CONNECT_PROTOCOL_V2
-: CONNECT_PROTOCOL_V1;
+short protocolVersion = 
ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();

Review Comment:
   Yep, exactly 
   Should've known that when I implemented KIP-507 originally but was still 
getting my bearings with the group coordinator logic.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -108,18 +107,15 @@ public Map performAssignment(String 
leaderId, String protoco
 log.debug("Max config offset root: {}, local snapshot config offsets 
root: {}",
   maxOffset, coordinator.configSnapshot().offset());
 
-short protocolVersion = memberConfigs.values().stream()
-.allMatch(state -> state.assignment().version() == 
CONNECT_PROTOCOL_V2)
-? CONNECT_PROTOCOL_V2
-: CONNECT_PROTOCOL_V1;
+short protocolVersion = 
ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();

Review Comment:
   Yep, exactly 
   Should've known that when I implemented KIP-507 originally but was still 
getting my bearings with the group coordinator logic. Better late than never!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863308048


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment(
 log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
 log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
 
+Map> revokedConnectors = 
transformValues(toRevoke, ConnectorsAndTasks::connectors);
+Map> revokedTasks = 
transformValues(toRevoke, ConnectorsAndTasks::tasks);
+
 return new ClusterAssignment(
 incrementalConnectorAssignments,
 incrementalTaskAssignments,
-transformValues(toRevoke, ConnectorsAndTasks::connectors),
-transformValues(toRevoke, ConnectorsAndTasks::tasks),
-connectorAssignments,
-taskAssignments
+revokedConnectors,
+revokedTasks,
+diff(connectorAssignments, revokedConnectors),
+diff(taskAssignments, revokedTasks)

Review Comment:
   Yep! There was a bug here. FWIW the same issue is already fixed by 
https://github.com/apache/kafka/pull/12019.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-04-28 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r861366880


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -44,93 +35,49 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
-import static 
org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
-import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
-import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment;
 import static 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import static org.apache.kafka.connect.util.ConnectUtils.transformValues;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.runners.Parameterized.Parameter;
-import static org.junit.runners.Parameterized.Parameters;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-@RunWith(Parameterized.class)
-public class IncrementalCooperativeAssignorTest {
-@Rule
-public MockitoRule rule = MockitoJUnit.rule();
-
-@Mock
-private WorkerCoordinator coordinator;
-
-@Captor
-ArgumentCaptor> assignmentsCapture;
-
-@Parameters
-public static Iterable mode() {
-return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2);
-}

Review Comment:
   Turns out the logic there was both unnecessary (it's guaranteed that all 
metadata will have the same protocol) and incorrect (the serialization logic 
never propagated the correct protocol version).
   
   I've pushed a patch and a couple of tests to fix this; LMKWYT.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -1375,24 +1171,6 @@ private void assertCompleteAllocation() {
 });
 }
 
-private void verifyCoordinatorInteractions() {
-verify(coordinator, times(rebalanceNum)).configSnapshot();
-verify(coordinator, times(rebalanceNum)).leaderState(any());
-verify(coordinator, times(2 * rebalanceNum)).generationId();
-verify(coordinator, times(rebalanceNum)).memberId();
-verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
-}

Review Comment:
   Ack, done. The new test does nothing to verify the content of the leader 
state, but it does verify that `WorkerCoordinator::leaderState` is invoked, 
which is the same coverage that the current tests provide.
   
   I've also augmented the existing test logic to check the content of the 
`ClusterAssignment` (specifically, the return value of the 
`allAssignedConnectors` and `allAssignedTasks` methods for each worker in the 
cluster) in `applyAssignments`. As long as we use the `ClusterAssignment` 
object correctly (and it's fairly trivial to verify that we do) by passing its 
state to the coordinator, then this should ensure that the proper leader state 
is recorded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-04-28 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r861366648


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -1142,107 +982,65 @@ private void removeConnector(String connector) {
 );
 }
 
-private void updateConfigSnapshot() {
-when(coordinator.configSnapshot()).thenReturn(configState());
-}
-
 private ClusterConfigState configState() {
 Map taskCounts = new HashMap<>(connectors);
-Map> connectorConfigs = 
taskCounts.keySet().stream().collect(Collectors.toMap(
-Function.identity(),
-connector -> Collections.emptyMap()
-));
-Map targetStates = 
taskCounts.keySet().stream().collect(Collectors.toMap(
-Function.identity(),
-connector -> TargetState.STARTED
-));
+Map> connectorConfigs = 
transformValues(taskCounts, c -> Collections.emptyMap());
+Map targetStates = transformValues(taskCounts, c 
-> TargetState.STARTED);
 Map> taskConfigs = 
taskCounts.entrySet().stream()
 .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> 
new ConnectorTaskId(e.getKey(), i)))
 .collect(Collectors.toMap(
 Function.identity(),
 connectorTaskId -> Collections.emptyMap()
 ));
 return new ClusterConfigState(
-offset,
+16,

Review Comment:
    sounds good, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-04-25 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r857610529


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -1375,24 +1171,6 @@ private void assertCompleteAllocation() {
 });
 }
 
-private void verifyCoordinatorInteractions() {
-verify(coordinator, times(rebalanceNum)).configSnapshot();
-verify(coordinator, times(rebalanceNum)).leaderState(any());
-verify(coordinator, times(2 * rebalanceNum)).generationId();
-verify(coordinator, times(rebalanceNum)).memberId();
-verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
-}

Review Comment:
   Not at the moment, no. The existing tests don't give us that much anyways:
   
   - It doesn't matter at all how many times we request the current config 
snapshot, generation ID, member ID, or last completed generation ID; these 
methods all have trivial implementations
   - Although it does matter that we invoke `Coordinator::leaderState` during 
the rebalance, the existing tests only verify that we invoke it an expected 
number of times, but without any assertions about what the actual state given 
to the coordinator is



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -44,93 +35,49 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
-import static 
org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
-import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
-import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment;
 import static 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import static org.apache.kafka.connect.util.ConnectUtils.transformValues;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.runners.Parameterized.Parameter;
-import static org.junit.runners.Parameterized.Parameters;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-@RunWith(Parameterized.class)
-public class IncrementalCooperativeAssignorTest {
-@Rule
-public MockitoRule rule = MockitoJUnit.rule();
-
-@Mock
-private WorkerCoordinator coordinator;
-
-@Captor
-ArgumentCaptor> assignmentsCapture;
-
-@Parameters
-public static Iterable mode() {
-return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2);
-}

Review Comment:
   Yeah, the existing tests provide basically no guarantees for these cases, 
and the actual allocation logic (which is 95% of what should be tested for this 
class) doesn't vary at all based on the protocol version, which can be verified 
by the lack of a `protocolVersion` argument in the visible-for-testing 
`performTaskAssignment` method.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -745,22 +779,108 @@ protected void assignTasks(List 
workerAssignment, Collection workerAssignment(Map memberConfigs,
+private static List workerAssignment(Map memberAssignments,
  ConnectorsAndTasks 
toExclude) {
 ConnectorsAndTasks ignore = new ConnectorsAndTasks.Builder()
 .with(new HashSet<>(toExclude.connectors()), new 
HashSet<>(toExclude.tasks()))
 .build();
 
-return memberConfigs.entrySet().stream()
+return memberAssignments.entrySet().stream()
 .map(e -> new WorkerLoad.Builder(e.getKey()).with(
-e.getValue().assignment().connectors().stream()
+e.getValue().connectors().stream()
 .filter(v -> !ignore.connectors().contains(v))
 .collect(Collectors.toList()),
-e.getValue().assignment().tasks().stream()
+e.getValue().tasks().stream()
 .filter(v -> !ignore.tasks().contains(v))