Repository: kafka
Updated Branches:
  refs/heads/trunk 65acff32d -> bc9ef716a


KAFKA-4553; Improve round robin assignment in Connect to avoid uneven 
distributions of connectors and tasks

Author: Ewen Cheslack-Postava <m...@ewencp.org>

Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Jason Gustafson 
<ja...@confluent.io>

Closes #2272 from ewencp/kafka-4553-better-connect-round-robin


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc9ef716
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc9ef716
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc9ef716

Branch: refs/heads/trunk
Commit: bc9ef716af8ac387d63b22ee973a9fc6f864bebb
Parents: 65acff3
Author: Ewen Cheslack-Postava <m...@ewencp.org>
Authored: Mon Dec 19 12:37:58 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Dec 19 12:37:58 2016 -0800

----------------------------------------------------------------------
 .../runtime/distributed/WorkerCoordinator.java  |  38 +++---
 .../distributed/WorkerCoordinatorTest.java      | 134 ++++++++++++++-----
 2 files changed, 123 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc9ef716/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 88a0a8d..58525c5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -154,25 +154,25 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
     protected Map<String, ByteBuffer> performAssignment(String leaderId, 
String protocol, Map<String, ByteBuffer> allMemberMetadata) {
         log.debug("Performing task assignment");
 
-        Map<String, ConnectProtocol.WorkerState> allConfigs = new HashMap<>();
+        Map<String, ConnectProtocol.WorkerState> memberConfigs = new 
HashMap<>();
         for (Map.Entry<String, ByteBuffer> entry : 
allMemberMetadata.entrySet())
-            allConfigs.put(entry.getKey(), 
ConnectProtocol.deserializeMetadata(entry.getValue()));
+            memberConfigs.put(entry.getKey(), 
ConnectProtocol.deserializeMetadata(entry.getValue()));
 
-        long maxOffset = findMaxMemberConfigOffset(allConfigs);
+        long maxOffset = findMaxMemberConfigOffset(memberConfigs);
         Long leaderOffset = ensureLeaderConfig(maxOffset);
         if (leaderOffset == null)
-            return fillAssignmentsAndSerialize(allConfigs.keySet(), 
ConnectProtocol.Assignment.CONFIG_MISMATCH,
-                    leaderId, allConfigs.get(leaderId).url(), maxOffset,
+            return fillAssignmentsAndSerialize(memberConfigs.keySet(), 
ConnectProtocol.Assignment.CONFIG_MISMATCH,
+                    leaderId, memberConfigs.get(leaderId).url(), maxOffset,
                     new HashMap<String, List<String>>(), new HashMap<String, 
List<ConnectorTaskId>>());
-        return performTaskAssignment(leaderId, leaderOffset, allConfigs);
+        return performTaskAssignment(leaderId, leaderOffset, memberConfigs);
     }
 
-    private long findMaxMemberConfigOffset(Map<String, 
ConnectProtocol.WorkerState> allConfigs) {
+    private long findMaxMemberConfigOffset(Map<String, 
ConnectProtocol.WorkerState> memberConfigs) {
         // The new config offset is the maximum seen by any member. We always 
perform assignment using this offset,
         // even if some members have fallen behind. The config offset used to 
generate the assignment is included in
         // the response so members that have fallen behind will not use the 
assignment until they have caught up.
         Long maxOffset = null;
-        for (Map.Entry<String, ConnectProtocol.WorkerState> stateEntry : 
allConfigs.entrySet()) {
+        for (Map.Entry<String, ConnectProtocol.WorkerState> stateEntry : 
memberConfigs.entrySet()) {
             long memberRootOffset = stateEntry.getValue().offset();
             if (maxOffset == null)
                 maxOffset = memberRootOffset;
@@ -205,13 +205,18 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
         return maxOffset;
     }
 
-    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, 
long maxOffset, Map<String, ConnectProtocol.WorkerState> allConfigs) {
+    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, 
long maxOffset, Map<String, ConnectProtocol.WorkerState> memberConfigs) {
         Map<String, List<String>> connectorAssignments = new HashMap<>();
         Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>();
 
-        // Perform round-robin task assignment
-        CircularIterator<String> memberIt = new 
CircularIterator<>(sorted(allConfigs.keySet()));
-        for (String connectorId : sorted(configSnapshot.connectors())) {
+        // Perform round-robin task assignment. Assign all connectors and then 
all tasks because assigning both the
+        // connector and its tasks can lead to very uneven distribution of 
work in some common cases (e.g. for connectors
+        // that generate only 1 task each; in a cluster of 2 or an even # of 
nodes, only even nodes will be assigned
+        // connectors and only odd nodes will be assigned tasks, but tasks 
are, on average, actually more resource
+        // intensive than connectors).
+        List<String> connectorsSorted = sorted(configSnapshot.connectors());
+        CircularIterator<String> memberIt = new 
CircularIterator<>(sorted(memberConfigs.keySet()));
+        for (String connectorId : connectorsSorted) {
             String connectorAssignedTo = memberIt.next();
             log.trace("Assigning connector {} to {}", connectorId, 
connectorAssignedTo);
             List<String> memberConnectors = 
connectorAssignments.get(connectorAssignedTo);
@@ -220,7 +225,8 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
                 connectorAssignments.put(connectorAssignedTo, 
memberConnectors);
             }
             memberConnectors.add(connectorId);
-
+        }
+        for (String connectorId : connectorsSorted) {
             for (ConnectorTaskId taskId : 
sorted(configSnapshot.tasks(connectorId))) {
                 String taskAssignedTo = memberIt.next();
                 log.trace("Assigning task {} to {}", taskId, taskAssignedTo);
@@ -233,10 +239,10 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
             }
         }
 
-        this.leaderState = new LeaderState(allConfigs, connectorAssignments, 
taskAssignments);
+        this.leaderState = new LeaderState(memberConfigs, 
connectorAssignments, taskAssignments);
 
-        return fillAssignmentsAndSerialize(allConfigs.keySet(), 
ConnectProtocol.Assignment.NO_ERROR,
-                leaderId, allConfigs.get(leaderId).url(), maxOffset, 
connectorAssignments, taskAssignments);
+        return fillAssignmentsAndSerialize(memberConfigs.keySet(), 
ConnectProtocol.Assignment.NO_ERROR,
+                leaderId, memberConfigs.get(leaderId).url(), maxOffset, 
connectorAssignments, taskAssignments);
     }
 
     private Map<String, ByteBuffer> 
fillAssignmentsAndSerialize(Collection<String> members,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc9ef716/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 3aff8f2..92393a1 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -58,11 +58,13 @@ public class WorkerCoordinatorTest {
     private static final String LEADER_URL = "leaderUrl:8083";
     private static final String MEMBER_URL = "memberUrl:8083";
 
-    private String connectorId = "connector";
+    private String connectorId1 = "connector1";
     private String connectorId2 = "connector2";
-    private ConnectorTaskId taskId0 = new ConnectorTaskId(connectorId, 0);
-    private ConnectorTaskId taskId1 = new ConnectorTaskId(connectorId, 1);
-    private ConnectorTaskId taskId2 = new ConnectorTaskId(connectorId2, 0);
+    private String connectorId3 = "connector3";
+    private ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0);
+    private ConnectorTaskId taskId1x1 = new ConnectorTaskId(connectorId1, 1);
+    private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
+    private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
 
     private String groupId = "test-group";
     private int sessionTimeoutMs = 10;
@@ -82,6 +84,7 @@ public class WorkerCoordinatorTest {
 
     private ClusterConfigState configState1;
     private ClusterConfigState configState2;
+    private ClusterConfigState configStateSingleTaskConnectors;
 
     @Before
     public void setup() {
@@ -110,32 +113,60 @@ public class WorkerCoordinatorTest {
                 rebalanceListener);
 
         configState1 = new ClusterConfigState(
-                1L, Collections.singletonMap(connectorId, 1),
-                Collections.singletonMap(connectorId, (Map<String, String>) 
new HashMap<String, String>()),
-                Collections.singletonMap(connectorId, TargetState.STARTED),
-                Collections.singletonMap(taskId0, (Map<String, String>) new 
HashMap<String, String>()),
+                1L,
+                Collections.singletonMap(connectorId1, 1),
+                Collections.singletonMap(connectorId1, (Map<String, String>) 
new HashMap<String, String>()),
+                Collections.singletonMap(connectorId1, TargetState.STARTED),
+                Collections.singletonMap(taskId1x0, (Map<String, String>) new 
HashMap<String, String>()),
                 Collections.<String>emptySet()
         );
+
         Map<String, Integer> configState2ConnectorTaskCounts = new HashMap<>();
-        configState2ConnectorTaskCounts.put(connectorId, 2);
+        configState2ConnectorTaskCounts.put(connectorId1, 2);
         configState2ConnectorTaskCounts.put(connectorId2, 1);
         Map<String, Map<String, String>> configState2ConnectorConfigs = new 
HashMap<>();
-        configState2ConnectorConfigs.put(connectorId, new HashMap<String, 
String>());
+        configState2ConnectorConfigs.put(connectorId1, new HashMap<String, 
String>());
         configState2ConnectorConfigs.put(connectorId2, new HashMap<String, 
String>());
-        Map<String, TargetState> targetStates = new HashMap<>();
-        targetStates.put(connectorId, TargetState.STARTED);
-        targetStates.put(connectorId2, TargetState.STARTED);
+        Map<String, TargetState> configState2TargetStates = new HashMap<>();
+        configState2TargetStates.put(connectorId1, TargetState.STARTED);
+        configState2TargetStates.put(connectorId2, TargetState.STARTED);
         Map<ConnectorTaskId, Map<String, String>> configState2TaskConfigs = 
new HashMap<>();
-        configState2TaskConfigs.put(taskId0, new HashMap<String, String>());
-        configState2TaskConfigs.put(taskId1, new HashMap<String, String>());
-        configState2TaskConfigs.put(taskId2, new HashMap<String, String>());
+        configState2TaskConfigs.put(taskId1x0, new HashMap<String, String>());
+        configState2TaskConfigs.put(taskId1x1, new HashMap<String, String>());
+        configState2TaskConfigs.put(taskId2x0, new HashMap<String, String>());
         configState2 = new ClusterConfigState(
-                2L, configState2ConnectorTaskCounts,
+                2L,
+                configState2ConnectorTaskCounts,
                 configState2ConnectorConfigs,
-                targetStates,
+                configState2TargetStates,
                 configState2TaskConfigs,
                 Collections.<String>emptySet()
         );
+
+        Map<String, Integer> 
configStateSingleTaskConnectorsConnectorTaskCounts = new HashMap<>();
+        configStateSingleTaskConnectorsConnectorTaskCounts.put(connectorId1, 
1);
+        configStateSingleTaskConnectorsConnectorTaskCounts.put(connectorId2, 
1);
+        configStateSingleTaskConnectorsConnectorTaskCounts.put(connectorId3, 
1);
+        Map<String, Map<String, String>> 
configStateSingleTaskConnectorsConnectorConfigs = new HashMap<>();
+        configStateSingleTaskConnectorsConnectorConfigs.put(connectorId1, new 
HashMap<String, String>());
+        configStateSingleTaskConnectorsConnectorConfigs.put(connectorId2, new 
HashMap<String, String>());
+        configStateSingleTaskConnectorsConnectorConfigs.put(connectorId3, new 
HashMap<String, String>());
+        Map<String, TargetState> configStateSingleTaskConnectorsTargetStates = 
new HashMap<>();
+        configStateSingleTaskConnectorsTargetStates.put(connectorId1, 
TargetState.STARTED);
+        configStateSingleTaskConnectorsTargetStates.put(connectorId2, 
TargetState.STARTED);
+        configStateSingleTaskConnectorsTargetStates.put(connectorId3, 
TargetState.STARTED);
+        Map<ConnectorTaskId, Map<String, String>> 
configStateSingleTaskConnectorsTaskConfigs = new HashMap<>();
+        configStateSingleTaskConnectorsTaskConfigs.put(taskId1x0, new 
HashMap<String, String>());
+        configStateSingleTaskConnectorsTaskConfigs.put(taskId2x0, new 
HashMap<String, String>());
+        configStateSingleTaskConnectorsTaskConfigs.put(taskId3x0, new 
HashMap<String, String>());
+        configStateSingleTaskConnectors = new ClusterConfigState(
+                2L,
+                configStateSingleTaskConnectorsConnectorTaskCounts,
+                configStateSingleTaskConnectorsConnectorConfigs,
+                configStateSingleTaskConnectorsTargetStates,
+                configStateSingleTaskConnectorsTaskConfigs,
+                Collections.<String>emptySet()
+        );
     }
 
     @After
@@ -187,7 +218,7 @@ public class WorkerCoordinatorTest {
                         sync.generationId() == 1 &&
                         sync.groupAssignment().containsKey(consumerId);
             }
-        }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 
1L, Collections.singletonList(connectorId),
+        }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 
1L, Collections.singletonList(connectorId1),
                 Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
         coordinator.ensureActiveGroup();
 
@@ -197,7 +228,7 @@ public class WorkerCoordinatorTest {
         assertFalse(rebalanceListener.assignment.failed());
         assertEquals(1L, rebalanceListener.assignment.offset());
         assertEquals("leader", rebalanceListener.assignment.leader());
-        assertEquals(Collections.singletonList(connectorId), 
rebalanceListener.assignment.connectors());
+        assertEquals(Collections.singletonList(connectorId1), 
rebalanceListener.assignment.connectors());
         assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.tasks());
 
         PowerMock.verifyAll();
@@ -225,7 +256,7 @@ public class WorkerCoordinatorTest {
                         sync.groupAssignment().isEmpty();
             }
         }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 
1L, Collections.<String>emptyList(),
-                Collections.singletonList(taskId0), Errors.NONE.code()));
+                Collections.singletonList(taskId1x0), Errors.NONE.code()));
         coordinator.ensureActiveGroup();
 
         assertFalse(coordinator.needRejoin());
@@ -234,7 +265,7 @@ public class WorkerCoordinatorTest {
         assertFalse(rebalanceListener.assignment.failed());
         assertEquals(1L, rebalanceListener.assignment.offset());
         assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.connectors());
-        assertEquals(Collections.singletonList(taskId0), 
rebalanceListener.assignment.tasks());
+        assertEquals(Collections.singletonList(taskId1x0), 
rebalanceListener.assignment.tasks());
 
         PowerMock.verifyAll();
     }
@@ -270,7 +301,7 @@ public class WorkerCoordinatorTest {
                 Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
         client.prepareResponse(joinGroupFollowerResponse(1, memberId, 
"leader", Errors.NONE.code()));
         client.prepareResponse(matcher, 
syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L,
-                Collections.<String>emptyList(), 
Collections.singletonList(taskId0), Errors.NONE.code()));
+                Collections.<String>emptyList(), 
Collections.singletonList(taskId1x0), Errors.NONE.code()));
         coordinator.ensureActiveGroup();
 
         PowerMock.verifyAll();
@@ -289,7 +320,7 @@ public class WorkerCoordinatorTest {
         // join the group once
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", 
"leader", Errors.NONE.code()));
         
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", 1L, Collections.<String>emptyList(),
-                Collections.singletonList(taskId0), Errors.NONE.code()));
+                Collections.singletonList(taskId1x0), Errors.NONE.code()));
         coordinator.ensureActiveGroup();
 
         assertEquals(0, rebalanceListener.revokedCount);
@@ -297,22 +328,22 @@ public class WorkerCoordinatorTest {
         assertFalse(rebalanceListener.assignment.failed());
         assertEquals(1L, rebalanceListener.assignment.offset());
         assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.connectors());
-        assertEquals(Collections.singletonList(taskId0), 
rebalanceListener.assignment.tasks());
+        assertEquals(Collections.singletonList(taskId1x0), 
rebalanceListener.assignment.tasks());
 
         // and join the group again
         coordinator.requestRejoin();
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", 
"leader", Errors.NONE.code()));
-        
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", 1L, Collections.singletonList(connectorId),
+        
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", 1L, Collections.singletonList(connectorId1),
                 Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
         coordinator.ensureActiveGroup();
 
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(Collections.emptyList(), 
rebalanceListener.revokedConnectors);
-        assertEquals(Collections.singletonList(taskId0), 
rebalanceListener.revokedTasks);
+        assertEquals(Collections.singletonList(taskId1x0), 
rebalanceListener.revokedTasks);
         assertEquals(2, rebalanceListener.assignedCount);
         assertFalse(rebalanceListener.assignment.failed());
         assertEquals(1L, rebalanceListener.assignment.offset());
-        assertEquals(Collections.singletonList(connectorId), 
rebalanceListener.assignment.connectors());
+        assertEquals(Collections.singletonList(connectorId1), 
rebalanceListener.assignment.connectors());
         assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.tasks());
 
         PowerMock.verifyAll();
@@ -341,7 +372,7 @@ public class WorkerCoordinatorTest {
         assertEquals(false, leaderAssignment.failed());
         assertEquals("leader", leaderAssignment.leader());
         assertEquals(1, leaderAssignment.offset());
-        assertEquals(Collections.singletonList(connectorId), 
leaderAssignment.connectors());
+        assertEquals(Collections.singletonList(connectorId1), 
leaderAssignment.connectors());
         assertEquals(Collections.emptyList(), leaderAssignment.tasks());
 
         ConnectProtocol.Assignment memberAssignment = 
ConnectProtocol.deserializeAssignment(result.get("member"));
@@ -349,7 +380,7 @@ public class WorkerCoordinatorTest {
         assertEquals("leader", memberAssignment.leader());
         assertEquals(1, memberAssignment.offset());
         assertEquals(Collections.emptyList(), memberAssignment.connectors());
-        assertEquals(Collections.singletonList(taskId0), 
memberAssignment.tasks());
+        assertEquals(Collections.singletonList(taskId1x0), 
memberAssignment.tasks());
 
         PowerMock.verifyAll();
     }
@@ -377,15 +408,52 @@ public class WorkerCoordinatorTest {
         assertEquals(false, leaderAssignment.failed());
         assertEquals("leader", leaderAssignment.leader());
         assertEquals(1, leaderAssignment.offset());
-        assertEquals(Collections.singletonList(connectorId), 
leaderAssignment.connectors());
-        assertEquals(Arrays.asList(taskId1, taskId2), 
leaderAssignment.tasks());
+        assertEquals(Collections.singletonList(connectorId1), 
leaderAssignment.connectors());
+        assertEquals(Arrays.asList(taskId1x0, taskId2x0), 
leaderAssignment.tasks());
+
+        ConnectProtocol.Assignment memberAssignment = 
ConnectProtocol.deserializeAssignment(result.get("member"));
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("leader", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId2), 
memberAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId1x1), 
memberAssignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testLeaderPerformAssignmentSingleTaskConnectors() throws 
Exception {
+        // Since all the protocol responses are mocked, the other tests 
validate doSync runs, but don't validate its
+        // output. So we test it directly here.
+
+        
EasyMock.expect(configStorage.snapshot()).andReturn(configStateSingleTaskConnectors);
+
+        PowerMock.replayAll();
+
+        // Prime the current configuration state
+        coordinator.metadata();
+
+        Map<String, ByteBuffer> configs = new HashMap<>();
+        // Mark everyone as in sync with configState1
+        configs.put("leader", ConnectProtocol.serializeMetadata(new 
ConnectProtocol.WorkerState(LEADER_URL, 1L)));
+        configs.put("member", ConnectProtocol.serializeMetadata(new 
ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, 
"performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+
+        // Round robin assignment when there are the same number of connectors 
and tasks should result in each being
+        // evenly distributed across the workers, i.e. round robin assignment 
of connectors first, then followed by tasks
+        ConnectProtocol.Assignment leaderAssignment = 
ConnectProtocol.deserializeAssignment(result.get("leader"));
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Arrays.asList(connectorId1, connectorId3), 
leaderAssignment.connectors());
+        assertEquals(Arrays.asList(taskId2x0), leaderAssignment.tasks());
 
         ConnectProtocol.Assignment memberAssignment = 
ConnectProtocol.deserializeAssignment(result.get("member"));
         assertEquals(false, memberAssignment.failed());
         assertEquals("leader", memberAssignment.leader());
         assertEquals(1, memberAssignment.offset());
         assertEquals(Collections.singletonList(connectorId2), 
memberAssignment.connectors());
-        assertEquals(Collections.singletonList(taskId0), 
memberAssignment.tasks());
+        assertEquals(Arrays.asList(taskId1x0, taskId3x0), 
memberAssignment.tasks());
 
         PowerMock.verifyAll();
     }

Reply via email to