Repository: kafka Updated Branches: refs/heads/trunk 190239441 -> 71f7e7c3a
http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 8fc6dbd..e4ebc89 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -39,7 +39,6 @@ import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; -import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; import org.junit.Before; @@ -169,14 +168,15 @@ public class DistributedHerderTest { EasyMock.expect(member.memberId()).andStubReturn("member"); expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); expectPostRebalanceCatchup(SNAPSHOT); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -193,13 +193,14 @@ public class DistributedHerderTest { EasyMock.expect(member.memberId()).andStubReturn("member"); expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); expectPostRebalanceCatchup(SNAPSHOT); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -207,9 +208,9 @@ public class DistributedHerderTest { 1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList()); // and the new assignment started - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); @@ -229,13 +230,14 @@ public class DistributedHerderTest { EasyMock.expect(member.memberId()).andStubReturn("member"); expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); expectPostRebalanceCatchup(SNAPSHOT); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -243,9 +245,9 @@ public class DistributedHerderTest { 1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList()); // and the new assignment started - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(false); // worker is not running, so we should see no call to connectorTaskConfigs() @@ -263,13 +265,9 @@ public class DistributedHerderTest { @Test public void testHaltCleansUpWorker() { - EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1)); - worker.stopConnector(CONN1); - PowerMock.expectLastCall(); - EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1)); - worker.stopTasks(Collections.singleton(TASK1)); + worker.stopConnectors(); PowerMock.expectLastCall(); - worker.awaitStopTasks(Collections.singleton(TASK1)); + worker.stopAndAwaitTasks(); PowerMock.expectLastCall(); member.stop(); PowerMock.expectLastCall(); @@ -342,9 +340,9 @@ public class DistributedHerderTest { // Start with one connector expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); expectPostRebalanceCatchup(SNAPSHOT); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); @@ -377,9 +375,9 @@ public class DistributedHerderTest { expectPostRebalanceCatchup(SNAPSHOT); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); // now handle the connector restart @@ -390,13 +388,11 @@ public class DistributedHerderTest { member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true); - worker.stopConnector(CONN1); - PowerMock.expectLastCall(); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + PowerMock.expectLastCall().andReturn(true); + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); PowerMock.replayAll(); @@ -461,8 +457,6 @@ public class DistributedHerderTest { member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false); - PowerMock.replayAll(); herder.tick(); @@ -498,7 +492,6 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); String ownerUrl = "ownerUrl"; - EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false); EasyMock.expect(member.ownerUrl(CONN1)).andReturn(ownerUrl); PowerMock.replayAll(); @@ -530,8 +523,9 @@ public class DistributedHerderTest { expectPostRebalanceCatchup(SNAPSHOT); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); // now handle the task restart member.wakeup(); @@ -541,12 +535,11 @@ public class DistributedHerderTest { member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - EasyMock.expect(worker.ownsTask(TASK0)).andReturn(true); - worker.stopAndAwaitTask(TASK0); - PowerMock.expectLastCall(); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); PowerMock.replayAll(); @@ -602,7 +595,6 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); // now handle the task restart - EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false); member.wakeup(); PowerMock.expectLastCall(); member.ensureActive(); @@ -638,7 +630,6 @@ public class DistributedHerderTest { // now handle the task restart String ownerUrl = "ownerUrl"; - EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false); EasyMock.expect(member.ownerUrl(TASK0)).andReturn(ownerUrl); member.wakeup(); PowerMock.expectLastCall(); @@ -687,10 +678,10 @@ public class DistributedHerderTest { // Performs rebalance and gets new assignment expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - PowerMock.expectLastCall(); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -715,9 +706,9 @@ public class DistributedHerderTest { // join expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); expectPostRebalanceCatchup(SNAPSHOT); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); @@ -729,10 +720,10 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot worker.stopConnector(CONN1); - PowerMock.expectLastCall(); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + PowerMock.expectLastCall().andReturn(true); + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); @@ -757,9 +748,9 @@ public class DistributedHerderTest { // join expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); expectPostRebalanceCatchup(SNAPSHOT); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); @@ -773,8 +764,6 @@ public class DistributedHerderTest { EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1); PowerMock.expectLastCall(); - EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true); - worker.setTargetState(CONN1, TargetState.PAUSED); PowerMock.expectLastCall(); @@ -798,9 +787,9 @@ public class DistributedHerderTest { // start with the connector paused expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -814,7 +803,6 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); // we expect reconfiguration after resuming - EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); @@ -841,8 +829,9 @@ public class DistributedHerderTest { // join expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0)); expectPostRebalanceCatchup(SNAPSHOT); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -877,8 +866,9 @@ public class DistributedHerderTest { // join expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0)); expectPostRebalanceCatchup(SNAPSHOT); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -890,8 +880,6 @@ public class DistributedHerderTest { EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1); PowerMock.expectLastCall(); - EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false); - worker.setTargetState(CONN1, TargetState.PAUSED); PowerMock.expectLastCall(); @@ -918,8 +906,9 @@ public class DistributedHerderTest { // join expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0)); expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED)); - PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED)); + PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -931,11 +920,11 @@ public class DistributedHerderTest { EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); PowerMock.expectLastCall(); - EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false); - worker.setTargetState(CONN1, TargetState.STARTED); PowerMock.expectLastCall(); + EasyMock.expect(worker.isRunning(CONN1)).andReturn(false); + member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -970,8 +959,9 @@ public class DistributedHerderTest { expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(), Arrays.asList(TASK0)); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -1004,12 +994,13 @@ public class DistributedHerderTest { expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); expectPostRebalanceCatchup(SNAPSHOT); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -1070,9 +1061,9 @@ public class DistributedHerderTest { EasyMock.expect(member.memberId()).andStubReturn("leader"); expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); expectPostRebalanceCatchup(SNAPSHOT); - worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); @@ -1098,11 +1089,10 @@ public class DistributedHerderTest { // connector without rebalance EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG); worker.stopConnector(CONN1); - PowerMock.expectLastCall(); - Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture(); - worker.startConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject(), + PowerMock.expectLastCall().andReturn(true); + worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - PowerMock.expectLastCall(); + PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); @@ -1147,7 +1137,6 @@ public class DistributedHerderTest { // This requires inter-worker communication, so needs the REST API } - private void expectRebalance(final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) { @@ -1175,17 +1164,13 @@ public class DistributedHerderTest { }); if (revokedConnectors != null) { - for (String connector : revokedConnectors) { - worker.stopConnector(connector); - PowerMock.expectLastCall(); - } + worker.stopConnectors(revokedConnectors); + PowerMock.expectLastCall().andReturn(revokedConnectors); } - if (revokedTasks != null && !revokedTasks.isEmpty()) { - worker.stopTasks(revokedTasks); - PowerMock.expectLastCall(); - worker.awaitStopTasks(revokedTasks); - PowerMock.expectLastCall(); + if (revokedTasks != null) { + worker.stopAndAwaitTasks(revokedTasks); + PowerMock.expectLastCall().andReturn(revokedTasks); } if (revokedConnectors != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 3772586..971d84f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.AbstractStatus; import org.apache.kafka.connect.runtime.ConnectorConfig; @@ -166,11 +167,11 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); worker.stopConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); + EasyMock.expectLastCall().andReturn(true); - worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(SourceSink.SOURCE))), + worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorConfig(SourceSink.SOURCE)), EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - EasyMock.expectLastCall(); + EasyMock.expectLastCall().andReturn(true); PowerMock.replayAll(); @@ -184,42 +185,15 @@ public class StandaloneHerderTest { } @Test - public void testRestartConnectorFailureOnStop() throws Exception { - expectAdd(SourceSink.SOURCE); - - RuntimeException e = new RuntimeException(); - worker.stopConnector(CONNECTOR_NAME); - EasyMock.expectLastCall().andThrow(e); - - // the connector will not be started after the failure in start - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); - - FutureCallback<Void> cb = new FutureCallback<>(); - herder.restartConnector(CONNECTOR_NAME, cb); - try { - cb.get(1000L, TimeUnit.MILLISECONDS); - fail(); - } catch (ExecutionException exception) { - assertEquals(e, exception.getCause()); - } - - PowerMock.verifyAll(); - } - - @Test public void testRestartConnectorFailureOnStart() throws Exception { expectAdd(SourceSink.SOURCE); worker.stopConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); + EasyMock.expectLastCall().andReturn(true); - RuntimeException e = new RuntimeException(); - worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(SourceSink.SOURCE))), + worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorConfig(SourceSink.SOURCE)), EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - EasyMock.expectLastCall().andThrow(e); + EasyMock.expectLastCall().andReturn(false); PowerMock.replayAll(); @@ -231,7 +205,7 @@ public class StandaloneHerderTest { cb.get(1000L, TimeUnit.MILLISECONDS); fail(); } catch (ExecutionException exception) { - assertEquals(e, exception.getCause()); + assertEquals(ConnectException.class, exception.getCause().getClass()); } PowerMock.verifyAll(); @@ -243,12 +217,10 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); worker.stopAndAwaitTask(taskId); - EasyMock.expectLastCall(); + EasyMock.expectLastCall().andReturn(true); - ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(SourceSink.SOURCE)); - TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE)); - worker.startTask(taskId, taskConfig, connConfig, herder, TargetState.STARTED); - EasyMock.expectLastCall(); + worker.startTask(taskId, connectorConfig(SourceSink.SOURCE), taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); + EasyMock.expectLastCall().andReturn(true); PowerMock.replayAll(); @@ -262,44 +234,15 @@ public class StandaloneHerderTest { } @Test - public void testRestartTaskFailureOnStop() throws Exception { - ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - expectAdd(SourceSink.SOURCE); - - RuntimeException e = new RuntimeException(); - worker.stopAndAwaitTask(taskId); - EasyMock.expectLastCall().andThrow(e); - - // task will not be started after the failure in stop - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); - - FutureCallback<Void> cb = new FutureCallback<>(); - herder.restartTask(taskId, cb); - try { - cb.get(1000L, TimeUnit.MILLISECONDS); - fail("Expected restart callback to raise an exception"); - } catch (ExecutionException exception) { - assertEquals(e, exception.getCause()); - } - PowerMock.verifyAll(); - } - - @Test public void testRestartTaskFailureOnStart() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); expectAdd(SourceSink.SOURCE); worker.stopAndAwaitTask(taskId); - EasyMock.expectLastCall(); + EasyMock.expectLastCall().andReturn(true); - RuntimeException e = new RuntimeException(); - ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(SourceSink.SOURCE)); - TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE)); - worker.startTask(taskId, taskConfig, connConfig, herder, TargetState.STARTED); - EasyMock.expectLastCall().andThrow(e); + worker.startTask(taskId, connectorConfig(SourceSink.SOURCE), taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); + EasyMock.expectLastCall().andReturn(false); PowerMock.replayAll(); @@ -311,7 +254,7 @@ public class StandaloneHerderTest { cb.get(1000L, TimeUnit.MILLISECONDS); fail("Expected restart callback to raise an exception"); } catch (ExecutionException exception) { - assertEquals(e, exception.getCause()); + assertEquals(ConnectException.class, exception.getCause().getClass()); } PowerMock.verifyAll(); @@ -409,11 +352,11 @@ public class StandaloneHerderTest { EasyMock.expectLastCall(); // Update config, which requires stopping and restarting worker.stopConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); - Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture(); - worker.startConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject(), + EasyMock.expectLastCall().andReturn(true); + Capture<Map<String, String>> capturedConfig = EasyMock.newCapture(); + worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - EasyMock.expectLastCall(); + EasyMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true); // Generate same task config, which should result in no additional action to restart tasks EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null)) @@ -432,7 +375,7 @@ public class StandaloneHerderTest { herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, putConnectorConfigCb); - assertEquals("bar", capturedConfig.getValue().originals().get("foo")); + assertEquals("bar", capturedConfig.getValue().get("foo")); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); PowerMock.verifyAll(); @@ -456,26 +399,24 @@ public class StandaloneHerderTest { Map<String, String> connectorProps = connectorConfig(sourceSink); - worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class), + worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); - EasyMock.expectLastCall(); + EasyMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true); ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0))); createCallback.onCompletion(null, new Herder.Created<>(true, connInfo)); EasyMock.expectLastCall(); - // And we should instantiate the tasks. For a sink task, we should see added properties for - // the input topic partitions - ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(sourceSink)); + // And we should instantiate the tasks. For a sink task, we should see added properties for the input topic partitions + Map<String, String> generatedTaskProps = taskConfig(sourceSink); - TaskConfig taskConfig = new TaskConfig(generatedTaskProps); EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null)) .andReturn(Collections.singletonList(generatedTaskProps)); - worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig, connConfig, herder, TargetState.STARTED); - EasyMock.expectLastCall(); + worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED); + EasyMock.expectLastCall().andReturn(true); worker.isSinkConnector(CONNECTOR_NAME); PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK); @@ -483,12 +424,10 @@ public class StandaloneHerderTest { private void expectStop() { ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0); - worker.stopTasks(Collections.singletonList(task)); - EasyMock.expectLastCall(); - worker.awaitStopTasks(Collections.singletonList(task)); - EasyMock.expectLastCall(); + worker.stopAndAwaitTasks(Collections.singletonList(task)); + EasyMock.expectLastCall().andReturn(Collections.singleton(task)); worker.stopConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); + EasyMock.expectLastCall().andReturn(true); } private void expectDestroy() { http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java index 45ccdd5..0e20b6a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.easymock.Capture; +import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.easymock.IAnswer; import org.junit.Test; @@ -186,6 +187,33 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport { } @Test + public void putSafeWithNoPreviousValueIsPropagated() { + final Converter converter = mock(Converter.class); + final KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class); + final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog); + + final byte[] value = new byte[0]; + + final Capture<Struct> statusValueStruct = newCapture(); + converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), capture(statusValueStruct)); + EasyMock.expectLastCall().andReturn(value); + + kafkaBasedLog.send(eq("status-connector-" + CONNECTOR), eq(value), anyObject(Callback.class)); + expectLastCall(); + + replayAll(); + + final ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.FAILED, WORKER_ID, 0); + store.putSafe(status); + + verifyAll(); + + assertEquals(status.state().toString(), statusValueStruct.getValue().get(KafkaStatusBackingStore.STATE_KEY_NAME)); + assertEquals(status.workerId(), statusValueStruct.getValue().get(KafkaStatusBackingStore.WORKER_ID_KEY_NAME)); + assertEquals(status.generation(), statusValueStruct.getValue().get(KafkaStatusBackingStore.GENERATION_KEY_NAME)); + } + + @Test public void putSafeOverridesValueSetBySameWorker() { final byte[] value = new byte[0]; http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/tests/kafkatest/services/connect.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index ebc19b0..bd2c9b9 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -379,13 +379,12 @@ class MockSink(object): class MockSource(object): - def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-source"): + def __init__(self, cc, mode=None, delay_sec=10, name="mock-source"): self.cc = cc self.logger = self.cc.logger self.name = name self.mode = mode self.delay_sec = delay_sec - self.topics = topics def start(self): self.logger.info("Creating connector MockSourceConnector %s", self.name) @@ -393,8 +392,6 @@ class MockSource(object): 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.MockSourceConnector', 'tasks.max': 1, - 'topics': ",".join(self.topics), 'mock_mode': self.mode, 'delay_ms': self.delay_sec * 1000 }) - http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/tests/kafkatest/tests/connect/connect_distributed_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 1902c59..ca8ff68 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -23,7 +23,7 @@ from kafkatest.services.security.security_config import SecurityConfig from ducktape.utils.util import wait_until from ducktape.mark import matrix import subprocess, itertools, time -from collections import Counter +from collections import Counter, namedtuple import operator class ConnectDistributedTest(Test): @@ -155,7 +155,43 @@ class ConnectDistributedTest(Test): wait_until(lambda: self.connector_is_running(self.sink), timeout_sec=10, err_msg="Failed to see connector transition to the RUNNING state") - + @matrix(delete_before_reconfig=[False, True]) + def test_bad_connector_class(self, delete_before_reconfig): + """ + For the same connector name, first configure it with a bad connector class name such that it fails to start, verify that it enters a FAILED state. + Restart should also fail. + Then try to rectify by reconfiguring it as a MockConnector and verifying it successfully transitions to RUNNING. + """ + self.setup_services() + self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) + self.cc.start() + + connector_name = 'bad-to-good-test' + + connector = namedtuple('BadConnector', ['name', 'tasks'])(connector_name, 1) + config = { + 'name': connector.name, + 'tasks.max': connector.tasks, + 'connector.class': 'java.util.HashMap' + } + self.cc.create_connector(config) + + wait_until(lambda: self.connector_is_failed(connector), timeout_sec=10, err_msg="Failed to see connector transition to FAILED state") + + try: + self.cc.restart_connector(connector_name) + except ConnectRestError: + pass + else: + raise AssertionError("Expected restart of %s to fail" % connector_name) + + if delete_before_reconfig: + self.cc.delete_connector(connector_name) + + config['connector.class'] = 'org.apache.kafka.connect.tools.MockSourceConnector' + self.cc.set_connector_config(connector_name, config) + wait_until(lambda: self.connector_is_running(connector), timeout_sec=10, err_msg="Failed to see connector transition to the RUNNING state") + @matrix(connector_type=["source", "sink"]) def test_restart_failed_task(self, connector_type): self.setup_services() @@ -166,7 +202,7 @@ class ConnectDistributedTest(Test): if connector_type == "sink": connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5) else: - connector = MockSource(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5) + connector = MockSource(self.cc, mode='task-failure', delay_sec=5) connector.start()
