Repository: kafka
Updated Branches:
  refs/heads/trunk 190239441 -> 71f7e7c3a


http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 8fc6dbd..e4ebc89 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -39,7 +39,6 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
-import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Before;
@@ -169,14 +168,15 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("member");
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
 
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), 
EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), 
EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -193,13 +193,14 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("member");
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), 
EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), 
EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -207,9 +208,9 @@ public class DistributedHerderTest {
                 1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
 
         // and the new assignment started
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -229,13 +230,14 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("member");
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), 
EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), 
EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -243,9 +245,9 @@ public class DistributedHerderTest {
                 1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
 
         // and the new assignment started
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(false);
 
         // worker is not running, so we should see no call to 
connectorTaskConfigs()
@@ -263,13 +265,9 @@ public class DistributedHerderTest {
 
     @Test
     public void testHaltCleansUpWorker() {
-        
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
-        worker.stopConnector(CONN1);
-        PowerMock.expectLastCall();
-        
EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
-        worker.stopTasks(Collections.singleton(TASK1));
+        worker.stopConnectors();
         PowerMock.expectLastCall();
-        worker.awaitStopTasks(Collections.singleton(TASK1));
+        worker.stopAndAwaitTasks();
         PowerMock.expectLastCall();
         member.stop();
         PowerMock.expectLastCall();
@@ -342,9 +340,9 @@ public class DistributedHerderTest {
         // Start with one connector
         expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
 
@@ -377,9 +375,9 @@ public class DistributedHerderTest {
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
 
         // now handle the connector restart
@@ -390,13 +388,11 @@ public class DistributedHerderTest {
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
-        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
-
         worker.stopConnector(CONN1);
-        PowerMock.expectLastCall();
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        PowerMock.expectLastCall().andReturn(true);
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
 
         PowerMock.replayAll();
@@ -461,8 +457,6 @@ public class DistributedHerderTest {
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
-        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
-
         PowerMock.replayAll();
 
         herder.tick();
@@ -498,7 +492,6 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         String ownerUrl = "ownerUrl";
-        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
         EasyMock.expect(member.ownerUrl(CONN1)).andReturn(ownerUrl);
 
         PowerMock.replayAll();
@@ -530,8 +523,9 @@ public class DistributedHerderTest {
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), 
EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
 
         // now handle the task restart
         member.wakeup();
@@ -541,12 +535,11 @@ public class DistributedHerderTest {
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
-        EasyMock.expect(worker.ownsTask(TASK0)).andReturn(true);
-
         worker.stopAndAwaitTask(TASK0);
-        PowerMock.expectLastCall();
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), 
EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
 
         PowerMock.replayAll();
 
@@ -602,7 +595,6 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // now handle the task restart
-        EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false);
         member.wakeup();
         PowerMock.expectLastCall();
         member.ensureActive();
@@ -638,7 +630,6 @@ public class DistributedHerderTest {
 
         // now handle the task restart
         String ownerUrl = "ownerUrl";
-        EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false);
         EasyMock.expect(member.ownerUrl(TASK0)).andReturn(ownerUrl);
         member.wakeup();
         PowerMock.expectLastCall();
@@ -687,10 +678,10 @@ public class DistributedHerderTest {
         // Performs rebalance and gets new assignment
         expectRebalance(Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -715,9 +706,9 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -729,10 +720,10 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
         EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for 
this test, it doesn't matter if we use the same config snapshot
         worker.stopConnector(CONN1);
-        PowerMock.expectLastCall();
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        PowerMock.expectLastCall().andReturn(true);
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -757,9 +748,9 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -773,8 +764,6 @@ public class DistributedHerderTest {
         
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
         PowerMock.expectLastCall();
 
-        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
-
         worker.setTargetState(CONN1, TargetState.PAUSED);
         PowerMock.expectLastCall();
 
@@ -798,9 +787,9 @@ public class DistributedHerderTest {
         // start with the connector paused
         expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
 
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -814,7 +803,6 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // we expect reconfiguration after resuming
-        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
 
@@ -841,8 +829,9 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Collections.<String>emptyList(), 
Collections.singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), 
EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -877,8 +866,9 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Collections.<String>emptyList(), 
Collections.singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), 
EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -890,8 +880,6 @@ public class DistributedHerderTest {
         
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
         PowerMock.expectLastCall();
 
-        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
-
         worker.setTargetState(CONN1, TargetState.PAUSED);
         PowerMock.expectLastCall();
 
@@ -918,8 +906,9 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Collections.<String>emptyList(), 
Collections.singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), 
EasyMock.eq(TargetState.PAUSED));
-        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
+        PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -931,11 +920,11 @@ public class DistributedHerderTest {
         EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
         PowerMock.expectLastCall();
 
-        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
-
         worker.setTargetState(CONN1, TargetState.STARTED);
         PowerMock.expectLastCall();
 
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(false);
+
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -970,8 +959,9 @@ public class DistributedHerderTest {
         expectRebalance(Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.NO_ERROR, 1, 
Collections.<String>emptyList(),
                 Arrays.asList(TASK0));
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), 
EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -1004,12 +994,13 @@ public class DistributedHerderTest {
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
 
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), 
EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), 
EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -1070,9 +1061,9 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
 
@@ -1098,11 +1089,10 @@ public class DistributedHerderTest {
         // connector without rebalance
         
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
         worker.stopConnector(CONN1);
-        PowerMock.expectLastCall();
-        Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
-        worker.startConnector(EasyMock.capture(capturedUpdatedConfig), 
EasyMock.<ConnectorContext>anyObject(),
+        PowerMock.expectLastCall().andReturn(true);
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        PowerMock.expectLastCall();
+        PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
 
@@ -1147,7 +1137,6 @@ public class DistributedHerderTest {
         // This requires inter-worker communication, so needs the REST API
     }
 
-
     private void expectRebalance(final long offset,
                                  final List<String> assignedConnectors,
                                  final List<ConnectorTaskId> assignedTasks) {
@@ -1175,17 +1164,13 @@ public class DistributedHerderTest {
         });
 
         if (revokedConnectors != null) {
-            for (String connector : revokedConnectors) {
-                worker.stopConnector(connector);
-                PowerMock.expectLastCall();
-            }
+            worker.stopConnectors(revokedConnectors);
+            PowerMock.expectLastCall().andReturn(revokedConnectors);
         }
 
-        if (revokedTasks != null && !revokedTasks.isEmpty()) {
-            worker.stopTasks(revokedTasks);
-            PowerMock.expectLastCall();
-            worker.awaitStopTasks(revokedTasks);
-            PowerMock.expectLastCall();
+        if (revokedTasks != null) {
+            worker.stopAndAwaitTasks(revokedTasks);
+            PowerMock.expectLastCall().andReturn(revokedTasks);
         }
 
         if (revokedConnectors != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 3772586..971d84f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -166,11 +167,11 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         worker.stopConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(true);
 
-        worker.startConnector(EasyMock.eq(new 
ConnectorConfig(connectorConfig(SourceSink.SOURCE))),
+        worker.startConnector(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(connectorConfig(SourceSink.SOURCE)),
                 EasyMock.anyObject(HerderConnectorContext.class), 
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(true);
 
         PowerMock.replayAll();
 
@@ -184,42 +185,15 @@ public class StandaloneHerderTest {
     }
 
     @Test
-    public void testRestartConnectorFailureOnStop() throws Exception {
-        expectAdd(SourceSink.SOURCE);
-
-        RuntimeException e = new RuntimeException();
-        worker.stopConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall().andThrow(e);
-
-        // the connector will not be started after the failure in start
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(SourceSink.SOURCE), false, createCallback);
-
-        FutureCallback<Void> cb = new FutureCallback<>();
-        herder.restartConnector(CONNECTOR_NAME, cb);
-        try {
-            cb.get(1000L, TimeUnit.MILLISECONDS);
-            fail();
-        } catch (ExecutionException exception) {
-            assertEquals(e, exception.getCause());
-        }
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
     public void testRestartConnectorFailureOnStart() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         worker.stopConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(true);
 
-        RuntimeException e = new RuntimeException();
-        worker.startConnector(EasyMock.eq(new 
ConnectorConfig(connectorConfig(SourceSink.SOURCE))),
+        worker.startConnector(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(connectorConfig(SourceSink.SOURCE)),
                 EasyMock.anyObject(HerderConnectorContext.class), 
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        EasyMock.expectLastCall().andThrow(e);
+        EasyMock.expectLastCall().andReturn(false);
 
         PowerMock.replayAll();
 
@@ -231,7 +205,7 @@ public class StandaloneHerderTest {
             cb.get(1000L, TimeUnit.MILLISECONDS);
             fail();
         } catch (ExecutionException exception) {
-            assertEquals(e, exception.getCause());
+            assertEquals(ConnectException.class, 
exception.getCause().getClass());
         }
 
         PowerMock.verifyAll();
@@ -243,12 +217,10 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         worker.stopAndAwaitTask(taskId);
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(true);
 
-        ConnectorConfig connConfig = new 
ConnectorConfig(connectorConfig(SourceSink.SOURCE));
-        TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE));
-        worker.startTask(taskId, taskConfig, connConfig, herder, 
TargetState.STARTED);
-        EasyMock.expectLastCall();
+        worker.startTask(taskId, connectorConfig(SourceSink.SOURCE), 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
+        EasyMock.expectLastCall().andReturn(true);
 
         PowerMock.replayAll();
 
@@ -262,44 +234,15 @@ public class StandaloneHerderTest {
     }
 
     @Test
-    public void testRestartTaskFailureOnStop() throws Exception {
-        ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
-        expectAdd(SourceSink.SOURCE);
-
-        RuntimeException e = new RuntimeException();
-        worker.stopAndAwaitTask(taskId);
-        EasyMock.expectLastCall().andThrow(e);
-
-        // task will not be started after the failure in stop
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(SourceSink.SOURCE), false, createCallback);
-
-        FutureCallback<Void> cb = new FutureCallback<>();
-        herder.restartTask(taskId, cb);
-        try {
-            cb.get(1000L, TimeUnit.MILLISECONDS);
-            fail("Expected restart callback to raise an exception");
-        } catch (ExecutionException exception) {
-            assertEquals(e, exception.getCause());
-        }
-        PowerMock.verifyAll();
-    }
-
-    @Test
     public void testRestartTaskFailureOnStart() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
         expectAdd(SourceSink.SOURCE);
 
         worker.stopAndAwaitTask(taskId);
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(true);
 
-        RuntimeException e = new RuntimeException();
-        ConnectorConfig connConfig = new 
ConnectorConfig(connectorConfig(SourceSink.SOURCE));
-        TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE));
-        worker.startTask(taskId, taskConfig, connConfig, herder, 
TargetState.STARTED);
-        EasyMock.expectLastCall().andThrow(e);
+        worker.startTask(taskId, connectorConfig(SourceSink.SOURCE), 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
+        EasyMock.expectLastCall().andReturn(false);
 
         PowerMock.replayAll();
 
@@ -311,7 +254,7 @@ public class StandaloneHerderTest {
             cb.get(1000L, TimeUnit.MILLISECONDS);
             fail("Expected restart callback to raise an exception");
         } catch (ExecutionException exception) {
-            assertEquals(e, exception.getCause());
+            assertEquals(ConnectException.class, 
exception.getCause().getClass());
         }
 
         PowerMock.verifyAll();
@@ -409,11 +352,11 @@ public class StandaloneHerderTest {
         EasyMock.expectLastCall();
         // Update config, which requires stopping and restarting
         worker.stopConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
-        Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture();
-        worker.startConnector(EasyMock.capture(capturedConfig), 
EasyMock.<ConnectorContext>anyObject(),
+        EasyMock.expectLastCall().andReturn(true);
+        Capture<Map<String, String>> capturedConfig = EasyMock.newCapture();
+        worker.startConnector(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
         // Generate same task config, which should result in no additional 
action to restart tasks
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, 
DEFAULT_MAX_TASKS, null))
@@ -432,7 +375,7 @@ public class StandaloneHerderTest {
         herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, 
createCallback);
         herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
         herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, 
putConnectorConfigCb);
-        assertEquals("bar", capturedConfig.getValue().originals().get("foo"));
+        assertEquals("bar", capturedConfig.getValue().get("foo"));
         herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
 
         PowerMock.verifyAll();
@@ -456,26 +399,24 @@ public class StandaloneHerderTest {
 
         Map<String, String> connectorProps = connectorConfig(sourceSink);
 
-        worker.startConnector(EasyMock.eq(new 
ConnectorConfig(connectorProps)), 
EasyMock.anyObject(HerderConnectorContext.class),
+        worker.startConnector(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class),
                               EasyMock.eq(herder), 
EasyMock.eq(TargetState.STARTED));
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
 
         ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, 
connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
         createCallback.onCompletion(null, new Herder.Created<>(true, 
connInfo));
         EasyMock.expectLastCall();
 
-        // And we should instantiate the tasks. For a sink task, we should see 
added properties for
-        // the input topic partitions
-        ConnectorConfig connConfig = new 
ConnectorConfig(connectorConfig(sourceSink));
+        // And we should instantiate the tasks. For a sink task, we should see 
added properties for the input topic partitions
+
         Map<String, String> generatedTaskProps = taskConfig(sourceSink);
-        TaskConfig taskConfig = new TaskConfig(generatedTaskProps);
 
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, 
DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null))
             .andReturn(Collections.singletonList(generatedTaskProps));
 
-        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig, 
connConfig, herder, TargetState.STARTED);
-        EasyMock.expectLastCall();
+        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), 
connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
+        EasyMock.expectLastCall().andReturn(true);
 
         worker.isSinkConnector(CONNECTOR_NAME);
         PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK);
@@ -483,12 +424,10 @@ public class StandaloneHerderTest {
 
     private void expectStop() {
         ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
-        worker.stopTasks(Collections.singletonList(task));
-        EasyMock.expectLastCall();
-        worker.awaitStopTasks(Collections.singletonList(task));
-        EasyMock.expectLastCall();
+        worker.stopAndAwaitTasks(Collections.singletonList(task));
+        EasyMock.expectLastCall().andReturn(Collections.singleton(task));
         worker.stopConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(true);
     }
 
     private void expectDestroy() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
index 45ccdd5..0e20b6a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.easymock.Capture;
+import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.easymock.IAnswer;
 import org.junit.Test;
@@ -186,6 +187,33 @@ public class KafkaStatusBackingStoreTest extends 
EasyMockSupport {
     }
 
     @Test
+    public void putSafeWithNoPreviousValueIsPropagated() {
+        final Converter converter = mock(Converter.class);
+        final KafkaBasedLog<String, byte[]> kafkaBasedLog = 
mock(KafkaBasedLog.class);
+        final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new 
MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+        final byte[] value = new byte[0];
+
+        final Capture<Struct> statusValueStruct = newCapture();
+        converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), 
capture(statusValueStruct));
+        EasyMock.expectLastCall().andReturn(value);
+
+        kafkaBasedLog.send(eq("status-connector-" + CONNECTOR), eq(value), 
anyObject(Callback.class));
+        expectLastCall();
+
+        replayAll();
+
+        final ConnectorStatus status = new ConnectorStatus(CONNECTOR, 
ConnectorStatus.State.FAILED, WORKER_ID, 0);
+        store.putSafe(status);
+
+        verifyAll();
+
+        assertEquals(status.state().toString(), 
statusValueStruct.getValue().get(KafkaStatusBackingStore.STATE_KEY_NAME));
+        assertEquals(status.workerId(), 
statusValueStruct.getValue().get(KafkaStatusBackingStore.WORKER_ID_KEY_NAME));
+        assertEquals(status.generation(), 
statusValueStruct.getValue().get(KafkaStatusBackingStore.GENERATION_KEY_NAME));
+    }
+
+    @Test
     public void putSafeOverridesValueSetBySameWorker() {
         final byte[] value = new byte[0];
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py 
b/tests/kafkatest/services/connect.py
index ebc19b0..bd2c9b9 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -379,13 +379,12 @@ class MockSink(object):
 
 class MockSource(object):
 
-    def __init__(self, cc, topics, mode=None, delay_sec=10, 
name="mock-source"):
+    def __init__(self, cc, mode=None, delay_sec=10, name="mock-source"):
         self.cc = cc
         self.logger = self.cc.logger
         self.name = name
         self.mode = mode
         self.delay_sec = delay_sec
-        self.topics = topics
 
     def start(self):
         self.logger.info("Creating connector MockSourceConnector %s", 
self.name)
@@ -393,8 +392,6 @@ class MockSource(object):
             'name': self.name,
             'connector.class': 
'org.apache.kafka.connect.tools.MockSourceConnector',
             'tasks.max': 1,
-            'topics': ",".join(self.topics),
             'mock_mode': self.mode,
             'delay_ms': self.delay_sec * 1000
         })
-        

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py 
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 1902c59..ca8ff68 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -23,7 +23,7 @@ from kafkatest.services.security.security_config import 
SecurityConfig
 from ducktape.utils.util import wait_until
 from ducktape.mark import matrix
 import subprocess, itertools, time
-from collections import Counter
+from collections import Counter, namedtuple
 import operator
 
 class ConnectDistributedTest(Test):
@@ -155,7 +155,43 @@ class ConnectDistributedTest(Test):
         wait_until(lambda: self.connector_is_running(self.sink), 
timeout_sec=10,
                    err_msg="Failed to see connector transition to the RUNNING 
state")
 
-    
+    @matrix(delete_before_reconfig=[False, True])
+    def test_bad_connector_class(self, delete_before_reconfig):
+        """
+        For the same connector name, first configure it with a bad connector 
class name such that it fails to start, verify that it enters a FAILED state.
+        Restart should also fail.
+        Then try to rectify by reconfiguring it as a MockConnector and 
verifying it successfully transitions to RUNNING.
+        """
+        self.setup_services()
+        self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        connector_name = 'bad-to-good-test'
+
+        connector = namedtuple('BadConnector', ['name', 
'tasks'])(connector_name, 1)
+        config = {
+            'name': connector.name,
+            'tasks.max': connector.tasks,
+            'connector.class': 'java.util.HashMap'
+        }
+        self.cc.create_connector(config)
+
+        wait_until(lambda: self.connector_is_failed(connector), 
timeout_sec=10, err_msg="Failed to see connector transition to FAILED state")
+
+        try:
+            self.cc.restart_connector(connector_name)
+        except ConnectRestError:
+            pass
+        else:
+            raise AssertionError("Expected restart of %s to fail" % 
connector_name)
+
+        if delete_before_reconfig:
+            self.cc.delete_connector(connector_name)
+
+        config['connector.class'] = 
'org.apache.kafka.connect.tools.MockSourceConnector'
+        self.cc.set_connector_config(connector_name, config)
+        wait_until(lambda: self.connector_is_running(connector), 
timeout_sec=10, err_msg="Failed to see connector transition to the RUNNING 
state")
+
     @matrix(connector_type=["source", "sink"])
     def test_restart_failed_task(self, connector_type):
         self.setup_services()
@@ -166,7 +202,7 @@ class ConnectDistributedTest(Test):
         if connector_type == "sink":
             connector = MockSink(self.cc, self.topics.keys(), 
mode='task-failure', delay_sec=5)
         else:
-            connector = MockSource(self.cc, self.topics.keys(), 
mode='task-failure', delay_sec=5)
+            connector = MockSource(self.cc, mode='task-failure', delay_sec=5)
             
         connector.start()
 

Reply via email to