Repository: kafka Updated Branches: refs/heads/trunk 65acff32d -> bc9ef716a
KAFKA-4553; Improve round robin assignment in Connect to avoid uneven distributions of connectors and tasks Author: Ewen Cheslack-Postava <m...@ewencp.org> Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Jason Gustafson <ja...@confluent.io> Closes #2272 from ewencp/kafka-4553-better-connect-round-robin Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc9ef716 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc9ef716 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc9ef716 Branch: refs/heads/trunk Commit: bc9ef716af8ac387d63b22ee973a9fc6f864bebb Parents: 65acff3 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Mon Dec 19 12:37:58 2016 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Dec 19 12:37:58 2016 -0800 ---------------------------------------------------------------------- .../runtime/distributed/WorkerCoordinator.java | 38 +++--- .../distributed/WorkerCoordinatorTest.java | 134 ++++++++++++++----- 2 files changed, 123 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bc9ef716/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 88a0a8d..58525c5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -154,25 +154,25 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) { log.debug("Performing task assignment"); - Map<String, ConnectProtocol.WorkerState> allConfigs = new HashMap<>(); + Map<String, ConnectProtocol.WorkerState> memberConfigs = new HashMap<>(); for (Map.Entry<String, ByteBuffer> entry : allMemberMetadata.entrySet()) - allConfigs.put(entry.getKey(), ConnectProtocol.deserializeMetadata(entry.getValue())); + memberConfigs.put(entry.getKey(), ConnectProtocol.deserializeMetadata(entry.getValue())); - long maxOffset = findMaxMemberConfigOffset(allConfigs); + long maxOffset = findMaxMemberConfigOffset(memberConfigs); Long leaderOffset = ensureLeaderConfig(maxOffset); if (leaderOffset == null) - return fillAssignmentsAndSerialize(allConfigs.keySet(), ConnectProtocol.Assignment.CONFIG_MISMATCH, - leaderId, allConfigs.get(leaderId).url(), maxOffset, + return fillAssignmentsAndSerialize(memberConfigs.keySet(), ConnectProtocol.Assignment.CONFIG_MISMATCH, + leaderId, memberConfigs.get(leaderId).url(), maxOffset, new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>()); - return performTaskAssignment(leaderId, leaderOffset, allConfigs); + return performTaskAssignment(leaderId, leaderOffset, memberConfigs); } - private long findMaxMemberConfigOffset(Map<String, ConnectProtocol.WorkerState> allConfigs) { + private long findMaxMemberConfigOffset(Map<String, ConnectProtocol.WorkerState> memberConfigs) { // The new config offset is the maximum seen by any member. We always perform assignment using this offset, // even if some members have fallen behind. The config offset used to generate the assignment is included in // the response so members that have fallen behind will not use the assignment until they have caught up. Long maxOffset = null; - for (Map.Entry<String, ConnectProtocol.WorkerState> stateEntry : allConfigs.entrySet()) { + for (Map.Entry<String, ConnectProtocol.WorkerState> stateEntry : memberConfigs.entrySet()) { long memberRootOffset = stateEntry.getValue().offset(); if (maxOffset == null) maxOffset = memberRootOffset; @@ -205,13 +205,18 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos return maxOffset; } - private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ConnectProtocol.WorkerState> allConfigs) { + private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ConnectProtocol.WorkerState> memberConfigs) { Map<String, List<String>> connectorAssignments = new HashMap<>(); Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>(); - // Perform round-robin task assignment - CircularIterator<String> memberIt = new CircularIterator<>(sorted(allConfigs.keySet())); - for (String connectorId : sorted(configSnapshot.connectors())) { + // Perform round-robin task assignment. Assign all connectors and then all tasks because assigning both the + // connector and its tasks can lead to very uneven distribution of work in some common cases (e.g. for connectors + // that generate only 1 task each; in a cluster of 2 or an even # of nodes, only even nodes will be assigned + // connectors and only odd nodes will be assigned tasks, but tasks are, on average, actually more resource + // intensive than connectors). + List<String> connectorsSorted = sorted(configSnapshot.connectors()); + CircularIterator<String> memberIt = new CircularIterator<>(sorted(memberConfigs.keySet())); + for (String connectorId : connectorsSorted) { String connectorAssignedTo = memberIt.next(); log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo); List<String> memberConnectors = connectorAssignments.get(connectorAssignedTo); @@ -220,7 +225,8 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos connectorAssignments.put(connectorAssignedTo, memberConnectors); } memberConnectors.add(connectorId); - + } + for (String connectorId : connectorsSorted) { for (ConnectorTaskId taskId : sorted(configSnapshot.tasks(connectorId))) { String taskAssignedTo = memberIt.next(); log.trace("Assigning task {} to {}", taskId, taskAssignedTo); @@ -233,10 +239,10 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos } } - this.leaderState = new LeaderState(allConfigs, connectorAssignments, taskAssignments); + this.leaderState = new LeaderState(memberConfigs, connectorAssignments, taskAssignments); - return fillAssignmentsAndSerialize(allConfigs.keySet(), ConnectProtocol.Assignment.NO_ERROR, - leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments); + return fillAssignmentsAndSerialize(memberConfigs.keySet(), ConnectProtocol.Assignment.NO_ERROR, + leaderId, memberConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments); } private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members, http://git-wip-us.apache.org/repos/asf/kafka/blob/bc9ef716/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 3aff8f2..92393a1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -58,11 +58,13 @@ public class WorkerCoordinatorTest { private static final String LEADER_URL = "leaderUrl:8083"; private static final String MEMBER_URL = "memberUrl:8083"; - private String connectorId = "connector"; + private String connectorId1 = "connector1"; private String connectorId2 = "connector2"; - private ConnectorTaskId taskId0 = new ConnectorTaskId(connectorId, 0); - private ConnectorTaskId taskId1 = new ConnectorTaskId(connectorId, 1); - private ConnectorTaskId taskId2 = new ConnectorTaskId(connectorId2, 0); + private String connectorId3 = "connector3"; + private ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0); + private ConnectorTaskId taskId1x1 = new ConnectorTaskId(connectorId1, 1); + private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0); + private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0); private String groupId = "test-group"; private int sessionTimeoutMs = 10; @@ -82,6 +84,7 @@ public class WorkerCoordinatorTest { private ClusterConfigState configState1; private ClusterConfigState configState2; + private ClusterConfigState configStateSingleTaskConnectors; @Before public void setup() { @@ -110,32 +113,60 @@ public class WorkerCoordinatorTest { rebalanceListener); configState1 = new ClusterConfigState( - 1L, Collections.singletonMap(connectorId, 1), - Collections.singletonMap(connectorId, (Map<String, String>) new HashMap<String, String>()), - Collections.singletonMap(connectorId, TargetState.STARTED), - Collections.singletonMap(taskId0, (Map<String, String>) new HashMap<String, String>()), + 1L, + Collections.singletonMap(connectorId1, 1), + Collections.singletonMap(connectorId1, (Map<String, String>) new HashMap<String, String>()), + Collections.singletonMap(connectorId1, TargetState.STARTED), + Collections.singletonMap(taskId1x0, (Map<String, String>) new HashMap<String, String>()), Collections.<String>emptySet() ); + Map<String, Integer> configState2ConnectorTaskCounts = new HashMap<>(); - configState2ConnectorTaskCounts.put(connectorId, 2); + configState2ConnectorTaskCounts.put(connectorId1, 2); configState2ConnectorTaskCounts.put(connectorId2, 1); Map<String, Map<String, String>> configState2ConnectorConfigs = new HashMap<>(); - configState2ConnectorConfigs.put(connectorId, new HashMap<String, String>()); + configState2ConnectorConfigs.put(connectorId1, new HashMap<String, String>()); configState2ConnectorConfigs.put(connectorId2, new HashMap<String, String>()); - Map<String, TargetState> targetStates = new HashMap<>(); - targetStates.put(connectorId, TargetState.STARTED); - targetStates.put(connectorId2, TargetState.STARTED); + Map<String, TargetState> configState2TargetStates = new HashMap<>(); + configState2TargetStates.put(connectorId1, TargetState.STARTED); + configState2TargetStates.put(connectorId2, TargetState.STARTED); Map<ConnectorTaskId, Map<String, String>> configState2TaskConfigs = new HashMap<>(); - configState2TaskConfigs.put(taskId0, new HashMap<String, String>()); - configState2TaskConfigs.put(taskId1, new HashMap<String, String>()); - configState2TaskConfigs.put(taskId2, new HashMap<String, String>()); + configState2TaskConfigs.put(taskId1x0, new HashMap<String, String>()); + configState2TaskConfigs.put(taskId1x1, new HashMap<String, String>()); + configState2TaskConfigs.put(taskId2x0, new HashMap<String, String>()); configState2 = new ClusterConfigState( - 2L, configState2ConnectorTaskCounts, + 2L, + configState2ConnectorTaskCounts, configState2ConnectorConfigs, - targetStates, + configState2TargetStates, configState2TaskConfigs, Collections.<String>emptySet() ); + + Map<String, Integer> configStateSingleTaskConnectorsConnectorTaskCounts = new HashMap<>(); + configStateSingleTaskConnectorsConnectorTaskCounts.put(connectorId1, 1); + configStateSingleTaskConnectorsConnectorTaskCounts.put(connectorId2, 1); + configStateSingleTaskConnectorsConnectorTaskCounts.put(connectorId3, 1); + Map<String, Map<String, String>> configStateSingleTaskConnectorsConnectorConfigs = new HashMap<>(); + configStateSingleTaskConnectorsConnectorConfigs.put(connectorId1, new HashMap<String, String>()); + configStateSingleTaskConnectorsConnectorConfigs.put(connectorId2, new HashMap<String, String>()); + configStateSingleTaskConnectorsConnectorConfigs.put(connectorId3, new HashMap<String, String>()); + Map<String, TargetState> configStateSingleTaskConnectorsTargetStates = new HashMap<>(); + configStateSingleTaskConnectorsTargetStates.put(connectorId1, TargetState.STARTED); + configStateSingleTaskConnectorsTargetStates.put(connectorId2, TargetState.STARTED); + configStateSingleTaskConnectorsTargetStates.put(connectorId3, TargetState.STARTED); + Map<ConnectorTaskId, Map<String, String>> configStateSingleTaskConnectorsTaskConfigs = new HashMap<>(); + configStateSingleTaskConnectorsTaskConfigs.put(taskId1x0, new HashMap<String, String>()); + configStateSingleTaskConnectorsTaskConfigs.put(taskId2x0, new HashMap<String, String>()); + configStateSingleTaskConnectorsTaskConfigs.put(taskId3x0, new HashMap<String, String>()); + configStateSingleTaskConnectors = new ClusterConfigState( + 2L, + configStateSingleTaskConnectorsConnectorTaskCounts, + configStateSingleTaskConnectorsConnectorConfigs, + configStateSingleTaskConnectorsTargetStates, + configStateSingleTaskConnectorsTaskConfigs, + Collections.<String>emptySet() + ); } @After @@ -187,7 +218,7 @@ public class WorkerCoordinatorTest { sync.generationId() == 1 && sync.groupAssignment().containsKey(consumerId); } - }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId), + }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1), Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code())); coordinator.ensureActiveGroup(); @@ -197,7 +228,7 @@ public class WorkerCoordinatorTest { assertFalse(rebalanceListener.assignment.failed()); assertEquals(1L, rebalanceListener.assignment.offset()); assertEquals("leader", rebalanceListener.assignment.leader()); - assertEquals(Collections.singletonList(connectorId), rebalanceListener.assignment.connectors()); + assertEquals(Collections.singletonList(connectorId1), rebalanceListener.assignment.connectors()); assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks()); PowerMock.verifyAll(); @@ -225,7 +256,7 @@ public class WorkerCoordinatorTest { sync.groupAssignment().isEmpty(); } }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(), - Collections.singletonList(taskId0), Errors.NONE.code())); + Collections.singletonList(taskId1x0), Errors.NONE.code())); coordinator.ensureActiveGroup(); assertFalse(coordinator.needRejoin()); @@ -234,7 +265,7 @@ public class WorkerCoordinatorTest { assertFalse(rebalanceListener.assignment.failed()); assertEquals(1L, rebalanceListener.assignment.offset()); assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors()); - assertEquals(Collections.singletonList(taskId0), rebalanceListener.assignment.tasks()); + assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.assignment.tasks()); PowerMock.verifyAll(); } @@ -270,7 +301,7 @@ public class WorkerCoordinatorTest { Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code())); client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, - Collections.<String>emptyList(), Collections.singletonList(taskId0), Errors.NONE.code())); + Collections.<String>emptyList(), Collections.singletonList(taskId1x0), Errors.NONE.code())); coordinator.ensureActiveGroup(); PowerMock.verifyAll(); @@ -289,7 +320,7 @@ public class WorkerCoordinatorTest { // join the group once client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(), - Collections.singletonList(taskId0), Errors.NONE.code())); + Collections.singletonList(taskId1x0), Errors.NONE.code())); coordinator.ensureActiveGroup(); assertEquals(0, rebalanceListener.revokedCount); @@ -297,22 +328,22 @@ public class WorkerCoordinatorTest { assertFalse(rebalanceListener.assignment.failed()); assertEquals(1L, rebalanceListener.assignment.offset()); assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors()); - assertEquals(Collections.singletonList(taskId0), rebalanceListener.assignment.tasks()); + assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.assignment.tasks()); // and join the group again coordinator.requestRejoin(); client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId), + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1), Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code())); coordinator.ensureActiveGroup(); assertEquals(1, rebalanceListener.revokedCount); assertEquals(Collections.emptyList(), rebalanceListener.revokedConnectors); - assertEquals(Collections.singletonList(taskId0), rebalanceListener.revokedTasks); + assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.revokedTasks); assertEquals(2, rebalanceListener.assignedCount); assertFalse(rebalanceListener.assignment.failed()); assertEquals(1L, rebalanceListener.assignment.offset()); - assertEquals(Collections.singletonList(connectorId), rebalanceListener.assignment.connectors()); + assertEquals(Collections.singletonList(connectorId1), rebalanceListener.assignment.connectors()); assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks()); PowerMock.verifyAll(); @@ -341,7 +372,7 @@ public class WorkerCoordinatorTest { assertEquals(false, leaderAssignment.failed()); assertEquals("leader", leaderAssignment.leader()); assertEquals(1, leaderAssignment.offset()); - assertEquals(Collections.singletonList(connectorId), leaderAssignment.connectors()); + assertEquals(Collections.singletonList(connectorId1), leaderAssignment.connectors()); assertEquals(Collections.emptyList(), leaderAssignment.tasks()); ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member")); @@ -349,7 +380,7 @@ public class WorkerCoordinatorTest { assertEquals("leader", memberAssignment.leader()); assertEquals(1, memberAssignment.offset()); assertEquals(Collections.emptyList(), memberAssignment.connectors()); - assertEquals(Collections.singletonList(taskId0), memberAssignment.tasks()); + assertEquals(Collections.singletonList(taskId1x0), memberAssignment.tasks()); PowerMock.verifyAll(); } @@ -377,15 +408,52 @@ public class WorkerCoordinatorTest { assertEquals(false, leaderAssignment.failed()); assertEquals("leader", leaderAssignment.leader()); assertEquals(1, leaderAssignment.offset()); - assertEquals(Collections.singletonList(connectorId), leaderAssignment.connectors()); - assertEquals(Arrays.asList(taskId1, taskId2), leaderAssignment.tasks()); + assertEquals(Collections.singletonList(connectorId1), leaderAssignment.connectors()); + assertEquals(Arrays.asList(taskId1x0, taskId2x0), leaderAssignment.tasks()); + + ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member")); + assertEquals(false, memberAssignment.failed()); + assertEquals("leader", memberAssignment.leader()); + assertEquals(1, memberAssignment.offset()); + assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors()); + assertEquals(Collections.singletonList(taskId1x1), memberAssignment.tasks()); + + PowerMock.verifyAll(); + } + + @Test + public void testLeaderPerformAssignmentSingleTaskConnectors() throws Exception { + // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its + // output. So we test it directly here. + + EasyMock.expect(configStorage.snapshot()).andReturn(configStateSingleTaskConnectors); + + PowerMock.replayAll(); + + // Prime the current configuration state + coordinator.metadata(); + + Map<String, ByteBuffer> configs = new HashMap<>(); + // Mark everyone as in sync with configState1 + configs.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L))); + configs.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L))); + Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs); + + // Round robin assignment when there are the same number of connectors and tasks should result in each being + // evenly distributed across the workers, i.e. round robin assignment of connectors first, then followed by tasks + ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader")); + assertEquals(false, leaderAssignment.failed()); + assertEquals("leader", leaderAssignment.leader()); + assertEquals(1, leaderAssignment.offset()); + assertEquals(Arrays.asList(connectorId1, connectorId3), leaderAssignment.connectors()); + assertEquals(Arrays.asList(taskId2x0), leaderAssignment.tasks()); ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member")); assertEquals(false, memberAssignment.failed()); assertEquals("leader", memberAssignment.leader()); assertEquals(1, memberAssignment.offset()); assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors()); - assertEquals(Collections.singletonList(taskId0), memberAssignment.tasks()); + assertEquals(Arrays.asList(taskId1x0, taskId3x0), memberAssignment.tasks()); PowerMock.verifyAll(); }