This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4099774da9c KAFKA-16084: Simplify and deduplicate standalone herder 
test mocking (#15389)
4099774da9c is described below

commit 4099774da9ccb9ebb2c53c37e15d7f58b1a66a4d
Author: Ahmed Sobeh <ahmed.so...@aiven.io>
AuthorDate: Tue Mar 26 22:24:37 2024 +0100

    KAFKA-16084: Simplify and deduplicate standalone herder test mocking 
(#15389)
    
    Reviewers: Greg Harris <greg.har...@aiven.io>
---
 .../runtime/standalone/StandaloneHerderTest.java   | 340 +++++++++------------
 1 file changed, 136 insertions(+), 204 deletions(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 41091bbdf46..bed08ffa2eb 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -68,7 +68,6 @@ import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -115,6 +114,7 @@ public class StandaloneHerderTest {
     private static final String TOPICS_LIST_STR = "topic1,topic2";
     private static final String WORKER_ID = "localhost:8083";
     private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+    private static final Long WAIT_TIME_MS = 15000L;
 
     private enum SourceSink {
         SOURCE, SINK
@@ -122,30 +122,28 @@ public class StandaloneHerderTest {
 
     private StandaloneHerder herder;
 
-    private Connector connector;
     @Mock
     protected Worker worker;
-    @Mock protected WorkerConfigTransformer transformer;
-    @Mock private Plugins plugins;
+    @Mock
+    protected WorkerConfigTransformer transformer;
+    @Mock
+    private Plugins plugins;
     @Mock
     private PluginClassLoader pluginLoader;
     @Mock
     private LoaderSwap loaderSwap;
     protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback;
-    @Mock protected StatusBackingStore statusBackingStore;
+    @Mock
+    protected StatusBackingStore statusBackingStore;
     private final SampleConnectorClientConfigOverridePolicy
-        noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
+            noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
 
     @Before
     public void setup() throws ExecutionException, InterruptedException {
-        worker = mock(Worker.class);
         herder = mock(StandaloneHerder.class, withSettings()
-            .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
-            .defaultAnswer(CALLS_REAL_METHODS));
+                .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
+                .defaultAnswer(CALLS_REAL_METHODS));
         createCallback = new FutureCallback<>();
-        plugins = mock(Plugins.class);
-        pluginLoader = mock(PluginClassLoader.class);
-        loaderSwap = mock(LoaderSwap.class);
         final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
         when(transformer.transform(eq(CONNECTOR_NAME), 
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
     }
@@ -153,26 +151,24 @@ public class StandaloneHerderTest {
     @After
     public void tearDown() {
         verifyNoMoreInteractions(worker, statusBackingStore);
+        herder.stop();
     }
 
     @Test
     public void testCreateSourceConnector() throws Exception {
-        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, config);
+        expectConfigValidation(SourceSink.SOURCE, config);
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
     }
 
     @Test
     public void testCreateConnectorFailedValidation() {
         // Basic validation should be performed and return an error, but 
should still evaluate the connector's config
-        connector = mock(BogusSourceConnector.class);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
@@ -193,7 +189,7 @@ public class StandaloneHerderTest {
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
 
-        ExecutionException exception = assertThrows(ExecutionException.class, 
() -> createCallback.get(1000L, TimeUnit.SECONDS));
+        ExecutionException exception = assertThrows(ExecutionException.class, 
() -> createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS));
         if (BadRequestException.class != exception.getCause().getClass()) {
             throw new AssertionError(exception.getCause());
         }
@@ -202,23 +198,21 @@ public class StandaloneHerderTest {
 
     @Test
     public void testCreateConnectorAlreadyExists() throws Throwable {
-        connector = mock(BogusSourceConnector.class);
         // First addition should succeed
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, config, config);
+        expectConfigValidation(SourceSink.SOURCE, config, config);
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         // Second should fail
         FutureCallback<Herder.Created<ConnectorInfo>> failedCreateCallback = 
new FutureCallback<>();
         // No new connector is created
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
failedCreateCallback);
-        ExecutionException exception = assertThrows(ExecutionException.class, 
() -> failedCreateCallback.get(1000L, TimeUnit.SECONDS));
+        ExecutionException exception = assertThrows(ExecutionException.class, 
() -> failedCreateCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS));
         if (AlreadyExistsException.class != exception.getCause().getClass()) {
             throw new AssertionError(exception.getCause());
         }
@@ -227,59 +221,47 @@ public class StandaloneHerderTest {
 
     @Test
     public void testCreateSinkConnector() throws Exception {
-        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> config = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = mock(SinkConnector.class);
-        expectConfigValidation(connectorMock, true, config);
+        expectConfigValidation(SourceSink.SINK, config);
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
     }
 
     @Test
     public void testCreateConnectorWithStoppedInitialState() throws Exception {
-        connector = mock(BogusSinkConnector.class);
         Map<String, String> config = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = mock(SinkConnector.class);
-        expectConfigValidation(connectorMock, false, config);
-        when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+        expectConfigValidation(SourceSink.SINK, config);
 
         // Only the connector should be created; we expect no tasks to be 
spawned for a connector created with a paused or stopped initial state
-        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
-        doAnswer(invocation -> {
-            onStart.getValue().onCompletion(null, TargetState.STOPPED);
-            return true;
-        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), 
any(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STOPPED), onStart.capture());
+        mockStartConnector(config, null, TargetState.STOPPED, null);
 
         when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true);
         when(herder.connectorType(any())).thenReturn(ConnectorType.SINK);
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, TargetState.STOPPED, 
false, createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(
-            new ConnectorInfo(CONNECTOR_NAME, 
connectorConfig(SourceSink.SINK), Collections.emptyList(), ConnectorType.SINK),
-            connectorInfo.result()
+                new ConnectorInfo(CONNECTOR_NAME, 
connectorConfig(SourceSink.SINK), Collections.emptyList(), ConnectorType.SINK),
+                connectorInfo.result()
         );
         verify(loaderSwap).close();
     }
 
     @Test
     public void testDestroyConnector() throws Exception {
-        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, config);
+        expectConfigValidation(SourceSink.SOURCE, config);
 
         
when(statusBackingStore.getAll(CONNECTOR_NAME)).thenReturn(Collections.emptyList());
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         FutureCallback<Herder.Created<ConnectorInfo>> deleteCallback = new 
FutureCallback<>();
@@ -288,7 +270,7 @@ public class StandaloneHerderTest {
         verify(herder).onDeletion(CONNECTOR_NAME);
         verify(statusBackingStore).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0));
         verify(statusBackingStore).put(new ConnectorStatus(CONNECTOR_NAME, 
ConnectorStatus.State.DESTROYED, WORKER_ID, 0));
-        deleteCallback.get(1000L, TimeUnit.MILLISECONDS);
+        deleteCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
 
         // Second deletion should fail since the connector is gone
         FutureCallback<Herder.Created<ConnectorInfo>> failedDeleteCallback = 
new FutureCallback<>();
@@ -296,7 +278,7 @@ public class StandaloneHerderTest {
         ExecutionException e = assertThrows(
                 "Should have thrown NotFoundException",
                 ExecutionException.class,
-                () -> failedDeleteCallback.get(1000L, TimeUnit.MILLISECONDS)
+                () -> failedDeleteCallback.get(WAIT_TIME_MS, 
TimeUnit.MILLISECONDS)
         );
         assertInstanceOf(NotFoundException.class, e.getCause());
     }
@@ -306,29 +288,23 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, config);
+        expectConfigValidation(SourceSink.SOURCE, config);
 
-        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
-        doAnswer(invocation -> {
-            onStart.getValue().onCompletion(null, TargetState.STARTED);
-            return true;
-        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), 
any(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STARTED), onStart.capture());
+        mockStartConnector(config, TargetState.STARTED, TargetState.STARTED, 
null);
 
         
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
         when(worker.getPlugins()).thenReturn(plugins);
         // same task configs as earlier, so don't expect a new set of tasks to 
be brought up
         when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, true)))
-            
.thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
+                
.thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         FutureCallback<Void> restartCallback = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, restartCallback);
-        restartCallback.get(1000L, TimeUnit.MILLISECONDS);
+        restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         verify(worker).stopAndAwaitConnector(eq(CONNECTOR_NAME));
     }
 
@@ -338,19 +314,13 @@ public class StandaloneHerderTest {
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, config);
+        expectConfigValidation(SourceSink.SOURCE, config);
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
-        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
-        doAnswer(invocation -> {
-            onStart.getValue().onCompletion(null, TargetState.STARTED);
-            return true;
-        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), 
any(HerderConnectorContext.class),
-                eq(herder), eq(TargetState.STARTED), onStart.capture());
+        mockStartConnector(config, TargetState.STARTED, TargetState.STARTED, 
null);
 
         
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
         when(worker.getPlugins()).thenReturn(plugins);
@@ -366,7 +336,7 @@ public class StandaloneHerderTest {
         expectStop();
         herder.restartConnector(CONNECTOR_NAME, restartCallback);
         verify(statusBackingStore).put(new TaskStatus(taskId, 
TaskStatus.State.DESTROYED, WORKER_ID, 0));
-        restartCallback.get(1000L, TimeUnit.MILLISECONDS);
+        restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -374,28 +344,21 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, config);
+        expectConfigValidation(SourceSink.SOURCE, config);
 
         doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
 
-        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
 
         Exception exception = new ConnectException("Failed to start 
connector");
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
-
         FutureCallback<Void> restartCallback = new FutureCallback<>();
-        doAnswer(invocation -> {
-            onStart.getValue().onCompletion(exception, null);
-            return true;
-        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), 
any(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STARTED), onStart.capture());
+        mockStartConnector(config, null, TargetState.STARTED, exception);
         herder.restartConnector(CONNECTOR_NAME, restartCallback);
         try {
-            restartCallback.get(1000L, TimeUnit.MILLISECONDS);
+            restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
             fail();
         } catch (ExecutionException e) {
             assertEquals(exception, e.getCause());
@@ -408,8 +371,8 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, connectorConfig);
+
+        expectConfigValidation(SourceSink.SOURCE, connectorConfig);
 
         doNothing().when(worker).stopAndAwaitTask(taskId);
 
@@ -426,16 +389,16 @@ public class StandaloneHerderTest {
                 new HashSet<>(),
                 transformer);
         when(worker.startSourceTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED))
-            .thenReturn(true);
+                .thenReturn(true);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
-        createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
+        createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         FutureCallback<Void> restartTaskCallback = new FutureCallback<>();
         herder.restartTask(taskId, restartTaskCallback);
-        restartTaskCallback.get(1000L, TimeUnit.MILLISECONDS);
+        restartTaskCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -444,8 +407,7 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, connectorConfig);
+        expectConfigValidation(SourceSink.SOURCE, connectorConfig);
 
         ClusterConfigState configState = new ClusterConfigState(
                 -1,
@@ -460,17 +422,17 @@ public class StandaloneHerderTest {
                 new HashSet<>(),
                 transformer);
         when(worker.startSourceTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED))
-            .thenReturn(false);
+                .thenReturn(false);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartTask(taskId, cb);
         verify(worker).stopAndAwaitTask(taskId);
         try {
-            cb.get(1000L, TimeUnit.MILLISECONDS);
+            cb.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
             fail("Expected restart callback to raise an exception");
         } catch (ExecutionException exception) {
             assertEquals(ConnectException.class, 
exception.getCause().getClass());
@@ -482,7 +444,7 @@ public class StandaloneHerderTest {
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         RestartRequest restartRequest = new RestartRequest("UnknownConnector", 
false, true);
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
-        ExecutionException ee = assertThrows(ExecutionException.class, () -> 
restartCallback.get(1000L, TimeUnit.MILLISECONDS));
+        ExecutionException ee = assertThrows(ExecutionException.class, () -> 
restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS));
         assertInstanceOf(NotFoundException.class, ee.getCause());
     }
 
@@ -491,20 +453,18 @@ public class StandaloneHerderTest {
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
         
doReturn(Optional.empty()).when(herder).buildRestartPlan(restartRequest);
 
-        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = mock(SinkConnector.class);
-        expectConfigValidation(connectorMock, true, connectorConfig);
+        expectConfigValidation(SourceSink.SINK, connectorConfig);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
 
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
-        ExecutionException ee = assertThrows(ExecutionException.class, () -> 
restartCallback.get(1000L, TimeUnit.MILLISECONDS));
+        ExecutionException ee = assertThrows(ExecutionException.class, () -> 
restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS));
         assertInstanceOf(NotFoundException.class, ee.getCause());
         assertTrue(ee.getMessage().contains("Status for connector"));
     }
@@ -519,20 +479,18 @@ public class StandaloneHerderTest {
         
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
         
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
 
-        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = mock(SinkConnector.class);
-        expectConfigValidation(connectorMock, true, connectorConfig);
+        expectConfigValidation(SourceSink.SINK, connectorConfig);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
 
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
-        assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
+        assertEquals(connectorStateInfo, restartCallback.get(WAIT_TIME_MS, 
TimeUnit.MILLISECONDS));
     }
 
     @Test
@@ -545,29 +503,22 @@ public class StandaloneHerderTest {
         
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
         
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
 
-        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = mock(SinkConnector.class);
-        expectConfigValidation(connectorMock, true, connectorConfig);
+        expectConfigValidation(SourceSink.SINK, connectorConfig);
 
         doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
 
-        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
-        doAnswer(invocation -> {
-            onStart.getValue().onCompletion(null, TargetState.STARTED);
-            return true;
-        }).when(worker).startConnector(eq(CONNECTOR_NAME), 
eq(connectorConfig), any(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STARTED), onStart.capture());
+        mockStartConnector(connectorConfig, null, TargetState.STARTED, null);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
 
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
-        assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
+        assertEquals(connectorStateInfo, restartCallback.get(WAIT_TIME_MS, 
TimeUnit.MILLISECONDS));
 
         verifyConnectorStatusRestart();
     }
@@ -586,12 +537,10 @@ public class StandaloneHerderTest {
         
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
         
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
 
-        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = mock(SinkConnector.class);
-        expectConfigValidation(connectorMock, true, connectorConfig);
+        expectConfigValidation(SourceSink.SINK, connectorConfig);
 
         
doNothing().when(worker).stopAndAwaitTasks(Collections.singletonList(taskId));
 
@@ -608,15 +557,15 @@ public class StandaloneHerderTest {
                 new HashSet<>(),
                 transformer);
         when(worker.startSinkTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SINK), herder, TargetState.STARTED))
-            .thenReturn(true);
+                .thenReturn(true);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
 
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
-        assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
+        assertEquals(connectorStateInfo, restartCallback.get(WAIT_TIME_MS, 
TimeUnit.MILLISECONDS));
         ArgumentCaptor<TaskStatus> taskStatus = 
ArgumentCaptor.forClass(TaskStatus.class);
         verify(statusBackingStore).put(taskStatus.capture());
         assertEquals(AbstractStatus.State.RESTARTING, 
taskStatus.getValue().state());
@@ -639,22 +588,15 @@ public class StandaloneHerderTest {
 
         ArgumentCaptor<TaskStatus> taskStatus = 
ArgumentCaptor.forClass(TaskStatus.class);
 
-        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = mock(SinkConnector.class);
-        expectConfigValidation(connectorMock, true, connectorConfig);
+        expectConfigValidation(SourceSink.SINK, connectorConfig);
 
         doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
         
doNothing().when(worker).stopAndAwaitTasks(Collections.singletonList(taskId));
 
-        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
-        doAnswer(invocation -> {
-            onStart.getValue().onCompletion(null, TargetState.STARTED);
-            return true;
-        }).when(worker).startConnector(eq(CONNECTOR_NAME), 
eq(connectorConfig), any(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STARTED), onStart.capture());
+        mockStartConnector(connectorConfig, null, TargetState.STARTED, null);
 
         ClusterConfigState configState = new ClusterConfigState(
                 -1,
@@ -669,15 +611,15 @@ public class StandaloneHerderTest {
                 new HashSet<>(),
                 transformer);
         when(worker.startSinkTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SINK), herder, TargetState.STARTED))
-            .thenReturn(true);
+                .thenReturn(true);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
 
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
-        assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
+        assertEquals(connectorStateInfo, restartCallback.get(WAIT_TIME_MS, 
TimeUnit.MILLISECONDS));
 
         verifyConnectorStatusRestart();
         verify(statusBackingStore).put(taskStatus.capture());
@@ -687,17 +629,15 @@ public class StandaloneHerderTest {
 
     @Test
     public void testCreateAndStop() throws Exception {
-        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, connectorConfig);
+        expectConfigValidation(SourceSink.SOURCE, connectorConfig);
 
         expectStop();
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         // herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
@@ -726,22 +666,21 @@ public class StandaloneHerderTest {
         
doNothing().when(tasksConfigCb).onCompletion(any(NotFoundException.class), 
isNull());
         
doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), 
isNull());
 
-        // Create connector
-        connector = mock(BogusSourceConnector.class);
+
         expectAdd(SourceSink.SOURCE);
-        expectConfigValidation(connector, true, connConfig);
+        expectConfigValidation(SourceSink.SOURCE, connConfig);
 
         // Validate accessors with 1 connector
         doNothing().when(listConnectorsCb).onCompletion(null, 
singleton(CONNECTOR_NAME));
-        ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, 
Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
-            ConnectorType.SOURCE);
+        ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, 
singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
+                ConnectorType.SOURCE);
         doNothing().when(connectorInfoCb).onCompletion(null, connInfo);
 
         TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 
0), taskConfig(SourceSink.SOURCE));
-        doNothing().when(taskConfigsCb).onCompletion(null, 
Arrays.asList(taskInfo));
+        doNothing().when(taskConfigsCb).onCompletion(null, 
singletonList(taskInfo));
 
         Map<ConnectorTaskId, Map<String, String>> tasksConfig = 
Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0),
-            taskConfig(SourceSink.SOURCE));
+                taskConfig(SourceSink.SOURCE));
         doNothing().when(tasksConfigCb).onCompletion(null, tasksConfig);
 
         // All operations are synchronous for StandaloneHerder, so we don't 
need to actually wait after making each call
@@ -752,7 +691,7 @@ public class StandaloneHerderTest {
         herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         reset(transformer);
@@ -773,11 +712,9 @@ public class StandaloneHerderTest {
 
         Callback<Map<String, String>> connectorConfigCb = mock(Callback.class);
 
-        // Create
-        connector = mock(BogusSourceConnector.class);
+
         expectAdd(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, connConfig);
+        expectConfigValidation(SourceSink.SOURCE, connConfig, newConnConfig);
 
         // Should get first config
         doNothing().when(connectorConfigCb).onCompletion(null, connConfig);
@@ -789,15 +726,14 @@ public class StandaloneHerderTest {
             onStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
         }).when(worker).startConnector(eq(CONNECTOR_NAME), 
capturedConfig.capture(), any(),
-            eq(herder), eq(TargetState.STARTED), onStart.capture());
+                eq(herder), eq(TargetState.STARTED), onStart.capture());
         // Generate same task config, which should result in no additional 
action to restart tasks
         when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, newConnConfig, true)))
-            .thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
+                .thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
 
-        expectConfigValidation(connectorMock, false, newConnConfig);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
@@ -806,8 +742,8 @@ public class StandaloneHerderTest {
         doNothing().when(connectorConfigCb).onCompletion(null, newConnConfig);
         herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, 
reconfigureCallback);
         Herder.Created<ConnectorInfo> newConnectorInfo = 
reconfigureCallback.get(1000L, TimeUnit.SECONDS);
-        ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, 
newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
-            ConnectorType.SOURCE);
+        ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, 
newConnConfig, singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
+                ConnectorType.SOURCE);
         assertEquals(newConnInfo, newConnectorInfo.result());
 
         assertEquals("bar", capturedConfig.getValue().get("foo"));
@@ -834,9 +770,9 @@ public class StandaloneHerderTest {
         List<String> errors = new ArrayList<>(singletonList(error));
         String key = "foo.invalid.key";
         when(connectorMock.validate(config)).thenReturn(
-            new Config(
-                Arrays.asList(new ConfigValue(key, null, 
Collections.emptyList(), errors))
-            )
+                new Config(
+                        singletonList(new ConfigValue(key, null, 
Collections.emptyList(), errors))
+                )
         );
         ConfigDef configDef = new ConfigDef();
         configDef.define(key, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "");
@@ -853,7 +789,7 @@ public class StandaloneHerderTest {
         ExecutionException e = assertThrows(
                 "Should have failed to configure connector",
                 ExecutionException.class,
-                () -> createCallback.get(1000L, TimeUnit.SECONDS)
+                () -> createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
         );
         assertNotNull(e.getCause());
         Throwable cause = e.getCause();
@@ -869,12 +805,10 @@ public class StandaloneHerderTest {
 
     @Test
     public void testTargetStates() throws Exception {
-        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, connectorConfig);
+        expectConfigValidation(SourceSink.SOURCE, connectorConfig);
 
         // We pause, then stop, the connector
         expectTargetState(CONNECTOR_NAME, TargetState.PAUSED);
@@ -887,14 +821,14 @@ public class StandaloneHerderTest {
         FutureCallback<List<TaskInfo>> taskConfigsCallback = new 
FutureCallback<>();
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
         herder.pauseConnector(CONNECTOR_NAME);
         herder.stopConnector(CONNECTOR_NAME, stopCallback);
         verify(statusBackingStore).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
-        stopCallback.get(10L, TimeUnit.SECONDS);
+        stopCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         herder.taskConfigs(CONNECTOR_NAME, taskConfigsCallback);
-        assertEquals(Collections.emptyList(), taskConfigsCallback.get(1, 
TimeUnit.SECONDS));
+        assertEquals(Collections.emptyList(), 
taskConfigsCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS));
 
         // herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
         herder.stop();
@@ -910,12 +844,12 @@ public class StandaloneHerderTest {
         herder.alterConnectorOffsets("unknown-connector",
                 
Collections.singletonMap(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
                 alterOffsetsCallback);
-        ExecutionException e = assertThrows(ExecutionException.class, () -> 
alterOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS));
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
alterOffsetsCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS));
         assertInstanceOf(NotFoundException.class, e.getCause());
 
         FutureCallback<Message> resetOffsetsCallback = new FutureCallback<>();
         herder.resetConnectorOffsets("unknown-connector", 
resetOffsetsCallback);
-        e = assertThrows(ExecutionException.class, () -> 
resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS));
+        e = assertThrows(ExecutionException.class, () -> 
resetOffsetsCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS));
         assertInstanceOf(NotFoundException.class, e.getCause());
     }
 
@@ -938,12 +872,12 @@ public class StandaloneHerderTest {
         herder.alterConnectorOffsets(CONNECTOR_NAME,
                 
Collections.singletonMap(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
                 alterOffsetsCallback);
-        ExecutionException e = assertThrows(ExecutionException.class, () -> 
alterOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS));
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
alterOffsetsCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS));
         assertInstanceOf(BadRequestException.class, e.getCause());
 
         FutureCallback<Message> resetOffsetsCallback = new FutureCallback<>();
         herder.resetConnectorOffsets(CONNECTOR_NAME, resetOffsetsCallback);
-        e = assertThrows(ExecutionException.class, () -> 
resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS));
+        e = assertThrows(ExecutionException.class, () -> 
resetOffsetsCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS));
         assertInstanceOf(BadRequestException.class, e.getCause());
     }
 
@@ -1004,33 +938,25 @@ public class StandaloneHerderTest {
 
     @Test()
     public void testRequestTaskReconfigurationDoesNotDeadlock() throws 
Exception {
-        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         // Start the connector
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, config);
-
+        // Prepare for connector and task config update
+        Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE);
+        newConfig.put("dummy-connector-property", "yes");
+        expectConfigValidation(SourceSink.SOURCE, config, newConfig);
+        mockStartConnector(newConfig, TargetState.STARTED, 
TargetState.STARTED, null);
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
 
         // Wait on connector to start
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         // Prepare for task config update
         
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
         expectStop();
 
-        // Prepare for connector and task config update
-        Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE);
-        newConfig.put("dummy-connector-property", "yes");
-        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
-        doAnswer(invocation -> {
-            onStart.getValue().onCompletion(null, TargetState.STARTED);
-            return true;
-        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(newConfig), 
any(HerderConnectorContext.class),
-                eq(herder), eq(TargetState.STARTED), onStart.capture());
 
         // Common invocations
         when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 
0)), any(), any(), any(), eq(herder), 
eq(TargetState.STARTED))).thenReturn(true);
@@ -1045,15 +971,14 @@ public class StandaloneHerderTest {
 
         // Set new config on the connector and tasks
         FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = 
new FutureCallback<>();
-        expectConfigValidation(connectorMock, false, newConfig);
         herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, 
reconfigureCallback);
 
         // Reconfigure the tasks
         herder.requestTaskReconfiguration(CONNECTOR_NAME);
 
         // Wait on connector update
-        Herder.Created<ConnectorInfo> updatedConnectorInfo = 
reconfigureCallback.get(1000L, TimeUnit.MILLISECONDS);
-        ConnectorInfo expectedConnectorInfo = new 
ConnectorInfo(CONNECTOR_NAME, newConfig, Arrays.asList(new 
ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE);
+        Herder.Created<ConnectorInfo> updatedConnectorInfo = 
reconfigureCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
+        ConnectorInfo expectedConnectorInfo = new 
ConnectorInfo(CONNECTOR_NAME, newConfig, singletonList(new 
ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE);
         assertEquals(expectedConnectorInfo, updatedConnectorInfo.result());
 
         verify(statusBackingStore, times(2)).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0));
@@ -1062,15 +987,10 @@ public class StandaloneHerderTest {
     private void expectAdd(SourceSink sourceSink) {
         Map<String, String> connectorProps = connectorConfig(sourceSink);
         ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
-            new SourceConnectorConfig(plugins, connectorProps, true) :
-            new SinkConnectorConfig(plugins, connectorProps);
+                new SourceConnectorConfig(plugins, connectorProps, true) :
+                new SinkConnectorConfig(plugins, connectorProps);
 
-        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
-        doAnswer(invocation -> {
-            onStart.getValue().onCompletion(null, TargetState.STARTED);
-            return true;
-        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(connectorProps), 
any(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STARTED), onStart.capture());
+        mockStartConnector(connectorProps, TargetState.STARTED, 
TargetState.STARTED, null);
         when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true);
         if (sourceSink == SourceSink.SOURCE) {
             when(worker.isTopicCreationEnabled()).thenReturn(true);
@@ -1081,7 +1001,7 @@ public class StandaloneHerderTest {
         Map<String, String> generatedTaskProps = taskConfig(sourceSink);
 
         when(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
-            .thenReturn(singletonList(generatedTaskProps));
+                .thenReturn(singletonList(generatedTaskProps));
 
         ClusterConfigState configState = new ClusterConfigState(
                 -1,
@@ -1127,8 +1047,8 @@ public class StandaloneHerderTest {
 
     private ConnectorInfo createdInfo(SourceSink sourceSink) {
         return new ConnectorInfo(CONNECTOR_NAME, connectorConfig(sourceSink),
-            Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
-            SourceSink.SOURCE == sourceSink ? ConnectorType.SOURCE : 
ConnectorType.SINK);
+                singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
+                SourceSink.SOURCE == sourceSink ? ConnectorType.SOURCE : 
ConnectorType.SINK);
     }
 
     private void expectStop() {
@@ -1168,38 +1088,40 @@ public class StandaloneHerderTest {
     }
 
     private void expectConfigValidation(
-            Connector connectorMock,
-            boolean shouldCreateConnector,
+            SourceSink sourceSink,
             Map<String, String>... configs
     ) {
         // config validation
+        Connector connectorMock = sourceSink == SourceSink.SOURCE ? 
mock(SourceConnector.class) : mock(SinkConnector.class);
         when(worker.configTransformer()).thenReturn(transformer);
         final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
         
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
         when(worker.getPlugins()).thenReturn(plugins);
         when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
         when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
-        if (shouldCreateConnector) {
-            when(worker.getPlugins()).thenReturn(plugins);
-            when(plugins.newConnector(anyString())).thenReturn(connectorMock);
-        }
+
+        // Assume the connector should always be created
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.newConnector(anyString())).thenReturn(connectorMock);
         when(connectorMock.config()).thenReturn(new ConfigDef());
 
-        for (Map<String, String> config : configs)
+        // Set up validation for each config
+        for (Map<String, String> config : configs) {
             when(connectorMock.validate(config)).thenReturn(new 
Config(Collections.emptyList()));
+        }
     }
 
     // We need to use a real class here due to some issue with mocking 
java.lang.Class
-    private abstract class BogusSourceConnector extends SourceConnector {
+    private static abstract class BogusSourceConnector extends SourceConnector 
{
     }
 
-    private abstract class BogusSourceTask extends SourceTask {
+    private static abstract class BogusSourceTask extends SourceTask {
     }
 
-    private abstract class BogusSinkConnector extends SinkConnector {
+    private static abstract class BogusSinkConnector extends SinkConnector {
     }
 
-    private abstract class BogusSinkTask extends SourceTask {
+    private static abstract class BogusSinkTask extends SourceTask {
     }
 
     private void verifyConnectorStatusRestart() {
@@ -1208,4 +1130,14 @@ public class StandaloneHerderTest {
         assertEquals(CONNECTOR_NAME, connectorStatus.getValue().id());
         assertEquals(AbstractStatus.State.RESTARTING, 
connectorStatus.getValue().state());
     }
+
+    private void mockStartConnector(Map<String, String> config, TargetState 
result, TargetState targetState, Exception exception) {
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            onStart.getValue().onCompletion(exception, result);
+            return true;
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config),
+                any(HerderConnectorContext.class),
+                eq(herder), eq(targetState), onStart.capture());
+    }
 }

Reply via email to