This is an automated email from the ASF dual-hosted git repository. gharris pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 4099774da9c KAFKA-16084: Simplify and deduplicate standalone herder test mocking (#15389) 4099774da9c is described below commit 4099774da9ccb9ebb2c53c37e15d7f58b1a66a4d Author: Ahmed Sobeh <ahmed.so...@aiven.io> AuthorDate: Tue Mar 26 22:24:37 2024 +0100 KAFKA-16084: Simplify and deduplicate standalone herder test mocking (#15389) Reviewers: Greg Harris <greg.har...@aiven.io> --- .../runtime/standalone/StandaloneHerderTest.java | 340 +++++++++------------ 1 file changed, 136 insertions(+), 204 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 41091bbdf46..bed08ffa2eb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -68,7 +68,6 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -115,6 +114,7 @@ public class StandaloneHerderTest { private static final String TOPICS_LIST_STR = "topic1,topic2"; private static final String WORKER_ID = "localhost:8083"; private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA"; + private static final Long WAIT_TIME_MS = 15000L; private enum SourceSink { SOURCE, SINK @@ -122,30 +122,28 @@ public class StandaloneHerderTest { private StandaloneHerder herder; - private Connector connector; @Mock protected Worker worker; - @Mock protected WorkerConfigTransformer transformer; - @Mock private Plugins plugins; + @Mock + protected WorkerConfigTransformer transformer; + @Mock + private Plugins plugins; @Mock private PluginClassLoader pluginLoader; @Mock private LoaderSwap loaderSwap; protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback; - @Mock protected StatusBackingStore statusBackingStore; + @Mock + protected StatusBackingStore statusBackingStore; private final SampleConnectorClientConfigOverridePolicy - noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy(); + noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy(); @Before public void setup() throws ExecutionException, InterruptedException { - worker = mock(Worker.class); herder = mock(StandaloneHerder.class, withSettings() - .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, new MockTime()) - .defaultAnswer(CALLS_REAL_METHODS)); + .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, new MockTime()) + .defaultAnswer(CALLS_REAL_METHODS)); createCallback = new FutureCallback<>(); - plugins = mock(Plugins.class); - pluginLoader = mock(PluginClassLoader.class); - loaderSwap = mock(LoaderSwap.class); final ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); when(transformer.transform(eq(CONNECTOR_NAME), configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); } @@ -153,26 +151,24 @@ public class StandaloneHerderTest { @After public void tearDown() { verifyNoMoreInteractions(worker, statusBackingStore); + herder.stop(); } @Test public void testCreateSourceConnector() throws Exception { - connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, config); + expectConfigValidation(SourceSink.SOURCE, config); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); } @Test public void testCreateConnectorFailedValidation() { // Basic validation should be performed and return an error, but should still evaluate the connector's config - connector = mock(BogusSourceConnector.class); Map<String, String> config = connectorConfig(SourceSink.SOURCE); config.remove(ConnectorConfig.NAME_CONFIG); @@ -193,7 +189,7 @@ public class StandaloneHerderTest { herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); - ExecutionException exception = assertThrows(ExecutionException.class, () -> createCallback.get(1000L, TimeUnit.SECONDS)); + ExecutionException exception = assertThrows(ExecutionException.class, () -> createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); if (BadRequestException.class != exception.getCause().getClass()) { throw new AssertionError(exception.getCause()); } @@ -202,23 +198,21 @@ public class StandaloneHerderTest { @Test public void testCreateConnectorAlreadyExists() throws Throwable { - connector = mock(BogusSourceConnector.class); // First addition should succeed expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, config, config); + expectConfigValidation(SourceSink.SOURCE, config, config); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); // Second should fail FutureCallback<Herder.Created<ConnectorInfo>> failedCreateCallback = new FutureCallback<>(); // No new connector is created herder.putConnectorConfig(CONNECTOR_NAME, config, false, failedCreateCallback); - ExecutionException exception = assertThrows(ExecutionException.class, () -> failedCreateCallback.get(1000L, TimeUnit.SECONDS)); + ExecutionException exception = assertThrows(ExecutionException.class, () -> failedCreateCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); if (AlreadyExistsException.class != exception.getCause().getClass()) { throw new AssertionError(exception.getCause()); } @@ -227,59 +221,47 @@ public class StandaloneHerderTest { @Test public void testCreateSinkConnector() throws Exception { - connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> config = connectorConfig(SourceSink.SINK); - Connector connectorMock = mock(SinkConnector.class); - expectConfigValidation(connectorMock, true, config); + expectConfigValidation(SourceSink.SINK, config); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); } @Test public void testCreateConnectorWithStoppedInitialState() throws Exception { - connector = mock(BogusSinkConnector.class); Map<String, String> config = connectorConfig(SourceSink.SINK); - Connector connectorMock = mock(SinkConnector.class); - expectConfigValidation(connectorMock, false, config); - when(plugins.newConnector(anyString())).thenReturn(connectorMock); + expectConfigValidation(SourceSink.SINK, config); // Only the connector should be created; we expect no tasks to be spawned for a connector created with a paused or stopped initial state - final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); - doAnswer(invocation -> { - onStart.getValue().onCompletion(null, TargetState.STOPPED); - return true; - }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), any(HerderConnectorContext.class), - eq(herder), eq(TargetState.STOPPED), onStart.capture()); + mockStartConnector(config, null, TargetState.STOPPED, null); when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true); when(herder.connectorType(any())).thenReturn(ConnectorType.SINK); herder.putConnectorConfig(CONNECTOR_NAME, config, TargetState.STOPPED, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals( - new ConnectorInfo(CONNECTOR_NAME, connectorConfig(SourceSink.SINK), Collections.emptyList(), ConnectorType.SINK), - connectorInfo.result() + new ConnectorInfo(CONNECTOR_NAME, connectorConfig(SourceSink.SINK), Collections.emptyList(), ConnectorType.SINK), + connectorInfo.result() ); verify(loaderSwap).close(); } @Test public void testDestroyConnector() throws Exception { - connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, config); + expectConfigValidation(SourceSink.SOURCE, config); when(statusBackingStore.getAll(CONNECTOR_NAME)).thenReturn(Collections.emptyList()); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); FutureCallback<Herder.Created<ConnectorInfo>> deleteCallback = new FutureCallback<>(); @@ -288,7 +270,7 @@ public class StandaloneHerderTest { verify(herder).onDeletion(CONNECTOR_NAME); verify(statusBackingStore).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0)); verify(statusBackingStore).put(new ConnectorStatus(CONNECTOR_NAME, ConnectorStatus.State.DESTROYED, WORKER_ID, 0)); - deleteCallback.get(1000L, TimeUnit.MILLISECONDS); + deleteCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); // Second deletion should fail since the connector is gone FutureCallback<Herder.Created<ConnectorInfo>> failedDeleteCallback = new FutureCallback<>(); @@ -296,7 +278,7 @@ public class StandaloneHerderTest { ExecutionException e = assertThrows( "Should have thrown NotFoundException", ExecutionException.class, - () -> failedDeleteCallback.get(1000L, TimeUnit.MILLISECONDS) + () -> failedDeleteCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS) ); assertInstanceOf(NotFoundException.class, e.getCause()); } @@ -306,29 +288,23 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, config); + expectConfigValidation(SourceSink.SOURCE, config); - final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); - doAnswer(invocation -> { - onStart.getValue().onCompletion(null, TargetState.STARTED); - return true; - }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), any(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), onStart.capture()); + mockStartConnector(config, TargetState.STARTED, TargetState.STARTED, null); when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); when(worker.getPlugins()).thenReturn(plugins); // same task configs as earlier, so don't expect a new set of tasks to be brought up when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))) - .thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE))); + .thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE))); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); FutureCallback<Void> restartCallback = new FutureCallback<>(); herder.restartConnector(CONNECTOR_NAME, restartCallback); - restartCallback.get(1000L, TimeUnit.MILLISECONDS); + restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); verify(worker).stopAndAwaitConnector(eq(CONNECTOR_NAME)); } @@ -338,19 +314,13 @@ public class StandaloneHerderTest { Map<String, String> config = connectorConfig(SourceSink.SOURCE); ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, config); + expectConfigValidation(SourceSink.SOURCE, config); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); - final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); - doAnswer(invocation -> { - onStart.getValue().onCompletion(null, TargetState.STARTED); - return true; - }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), any(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), onStart.capture()); + mockStartConnector(config, TargetState.STARTED, TargetState.STARTED, null); when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); when(worker.getPlugins()).thenReturn(plugins); @@ -366,7 +336,7 @@ public class StandaloneHerderTest { expectStop(); herder.restartConnector(CONNECTOR_NAME, restartCallback); verify(statusBackingStore).put(new TaskStatus(taskId, TaskStatus.State.DESTROYED, WORKER_ID, 0)); - restartCallback.get(1000L, TimeUnit.MILLISECONDS); + restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); } @Test @@ -374,28 +344,21 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, config); + expectConfigValidation(SourceSink.SOURCE, config); doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); - final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); Exception exception = new ConnectException("Failed to start connector"); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); - FutureCallback<Void> restartCallback = new FutureCallback<>(); - doAnswer(invocation -> { - onStart.getValue().onCompletion(exception, null); - return true; - }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), any(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), onStart.capture()); + mockStartConnector(config, null, TargetState.STARTED, exception); herder.restartConnector(CONNECTOR_NAME, restartCallback); try { - restartCallback.get(1000L, TimeUnit.MILLISECONDS); + restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); fail(); } catch (ExecutionException e) { assertEquals(exception, e.getCause()); @@ -408,8 +371,8 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, connectorConfig); + + expectConfigValidation(SourceSink.SOURCE, connectorConfig); doNothing().when(worker).stopAndAwaitTask(taskId); @@ -426,16 +389,16 @@ public class StandaloneHerderTest { new HashSet<>(), transformer); when(worker.startSourceTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED)) - .thenReturn(true); + .thenReturn(true); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); - createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); + createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); FutureCallback<Void> restartTaskCallback = new FutureCallback<>(); herder.restartTask(taskId, restartTaskCallback); - restartTaskCallback.get(1000L, TimeUnit.MILLISECONDS); + restartTaskCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); } @Test @@ -444,8 +407,7 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, connectorConfig); + expectConfigValidation(SourceSink.SOURCE, connectorConfig); ClusterConfigState configState = new ClusterConfigState( -1, @@ -460,17 +422,17 @@ public class StandaloneHerderTest { new HashSet<>(), transformer); when(worker.startSourceTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED)) - .thenReturn(false); + .thenReturn(false); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); FutureCallback<Void> cb = new FutureCallback<>(); herder.restartTask(taskId, cb); verify(worker).stopAndAwaitTask(taskId); try { - cb.get(1000L, TimeUnit.MILLISECONDS); + cb.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); fail("Expected restart callback to raise an exception"); } catch (ExecutionException exception) { assertEquals(ConnectException.class, exception.getCause().getClass()); @@ -482,7 +444,7 @@ public class StandaloneHerderTest { FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); RestartRequest restartRequest = new RestartRequest("UnknownConnector", false, true); herder.restartConnectorAndTasks(restartRequest, restartCallback); - ExecutionException ee = assertThrows(ExecutionException.class, () -> restartCallback.get(1000L, TimeUnit.MILLISECONDS)); + ExecutionException ee = assertThrows(ExecutionException.class, () -> restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); assertInstanceOf(NotFoundException.class, ee.getCause()); } @@ -491,20 +453,18 @@ public class StandaloneHerderTest { RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); doReturn(Optional.empty()).when(herder).buildRestartPlan(restartRequest); - connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK); - Connector connectorMock = mock(SinkConnector.class); - expectConfigValidation(connectorMock, true, connectorConfig); + expectConfigValidation(SourceSink.SINK, connectorConfig); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); - ExecutionException ee = assertThrows(ExecutionException.class, () -> restartCallback.get(1000L, TimeUnit.MILLISECONDS)); + ExecutionException ee = assertThrows(ExecutionException.class, () -> restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); assertInstanceOf(NotFoundException.class, ee.getCause()); assertTrue(ee.getMessage().contains("Status for connector")); } @@ -519,20 +479,18 @@ public class StandaloneHerderTest { when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); - connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK); - Connector connectorMock = mock(SinkConnector.class); - expectConfigValidation(connectorMock, true, connectorConfig); + expectConfigValidation(SourceSink.SINK, connectorConfig); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); - assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals(connectorStateInfo, restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); } @Test @@ -545,29 +503,22 @@ public class StandaloneHerderTest { when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); - connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK); - Connector connectorMock = mock(SinkConnector.class); - expectConfigValidation(connectorMock, true, connectorConfig); + expectConfigValidation(SourceSink.SINK, connectorConfig); doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); - final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); - doAnswer(invocation -> { - onStart.getValue().onCompletion(null, TargetState.STARTED); - return true; - }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(connectorConfig), any(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), onStart.capture()); + mockStartConnector(connectorConfig, null, TargetState.STARTED, null); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); - assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals(connectorStateInfo, restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); verifyConnectorStatusRestart(); } @@ -586,12 +537,10 @@ public class StandaloneHerderTest { when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); - connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK); - Connector connectorMock = mock(SinkConnector.class); - expectConfigValidation(connectorMock, true, connectorConfig); + expectConfigValidation(SourceSink.SINK, connectorConfig); doNothing().when(worker).stopAndAwaitTasks(Collections.singletonList(taskId)); @@ -608,15 +557,15 @@ public class StandaloneHerderTest { new HashSet<>(), transformer); when(worker.startSinkTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SINK), herder, TargetState.STARTED)) - .thenReturn(true); + .thenReturn(true); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); - assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals(connectorStateInfo, restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); ArgumentCaptor<TaskStatus> taskStatus = ArgumentCaptor.forClass(TaskStatus.class); verify(statusBackingStore).put(taskStatus.capture()); assertEquals(AbstractStatus.State.RESTARTING, taskStatus.getValue().state()); @@ -639,22 +588,15 @@ public class StandaloneHerderTest { ArgumentCaptor<TaskStatus> taskStatus = ArgumentCaptor.forClass(TaskStatus.class); - connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK); - Connector connectorMock = mock(SinkConnector.class); - expectConfigValidation(connectorMock, true, connectorConfig); + expectConfigValidation(SourceSink.SINK, connectorConfig); doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); doNothing().when(worker).stopAndAwaitTasks(Collections.singletonList(taskId)); - final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); - doAnswer(invocation -> { - onStart.getValue().onCompletion(null, TargetState.STARTED); - return true; - }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(connectorConfig), any(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), onStart.capture()); + mockStartConnector(connectorConfig, null, TargetState.STARTED, null); ClusterConfigState configState = new ClusterConfigState( -1, @@ -669,15 +611,15 @@ public class StandaloneHerderTest { new HashSet<>(), transformer); when(worker.startSinkTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SINK), herder, TargetState.STARTED)) - .thenReturn(true); + .thenReturn(true); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); - assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals(connectorStateInfo, restartCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); verifyConnectorStatusRestart(); verify(statusBackingStore).put(taskStatus.capture()); @@ -687,17 +629,15 @@ public class StandaloneHerderTest { @Test public void testCreateAndStop() throws Exception { - connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, connectorConfig); + expectConfigValidation(SourceSink.SOURCE, connectorConfig); expectStop(); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked @@ -726,22 +666,21 @@ public class StandaloneHerderTest { doNothing().when(tasksConfigCb).onCompletion(any(NotFoundException.class), isNull()); doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), isNull()); - // Create connector - connector = mock(BogusSourceConnector.class); + expectAdd(SourceSink.SOURCE); - expectConfigValidation(connector, true, connConfig); + expectConfigValidation(SourceSink.SOURCE, connConfig); // Validate accessors with 1 connector doNothing().when(listConnectorsCb).onCompletion(null, singleton(CONNECTOR_NAME)); - ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), - ConnectorType.SOURCE); + ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)), + ConnectorType.SOURCE); doNothing().when(connectorInfoCb).onCompletion(null, connInfo); TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)); - doNothing().when(taskConfigsCb).onCompletion(null, Arrays.asList(taskInfo)); + doNothing().when(taskConfigsCb).onCompletion(null, singletonList(taskInfo)); Map<ConnectorTaskId, Map<String, String>> tasksConfig = Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), - taskConfig(SourceSink.SOURCE)); + taskConfig(SourceSink.SOURCE)); doNothing().when(tasksConfigCb).onCompletion(null, tasksConfig); // All operations are synchronous for StandaloneHerder, so we don't need to actually wait after making each call @@ -752,7 +691,7 @@ public class StandaloneHerderTest { herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb); herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); reset(transformer); @@ -773,11 +712,9 @@ public class StandaloneHerderTest { Callback<Map<String, String>> connectorConfigCb = mock(Callback.class); - // Create - connector = mock(BogusSourceConnector.class); + expectAdd(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, connConfig); + expectConfigValidation(SourceSink.SOURCE, connConfig, newConnConfig); // Should get first config doNothing().when(connectorConfigCb).onCompletion(null, connConfig); @@ -789,15 +726,14 @@ public class StandaloneHerderTest { onStart.getValue().onCompletion(null, TargetState.STARTED); return true; }).when(worker).startConnector(eq(CONNECTOR_NAME), capturedConfig.capture(), any(), - eq(herder), eq(TargetState.STARTED), onStart.capture()); + eq(herder), eq(TargetState.STARTED), onStart.capture()); // Generate same task config, which should result in no additional action to restart tasks when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true))) - .thenReturn(singletonList(taskConfig(SourceSink.SOURCE))); + .thenReturn(singletonList(taskConfig(SourceSink.SOURCE))); - expectConfigValidation(connectorMock, false, newConnConfig); herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); @@ -806,8 +742,8 @@ public class StandaloneHerderTest { doNothing().when(connectorConfigCb).onCompletion(null, newConnConfig); herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, reconfigureCallback); Herder.Created<ConnectorInfo> newConnectorInfo = reconfigureCallback.get(1000L, TimeUnit.SECONDS); - ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), - ConnectorType.SOURCE); + ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)), + ConnectorType.SOURCE); assertEquals(newConnInfo, newConnectorInfo.result()); assertEquals("bar", capturedConfig.getValue().get("foo")); @@ -834,9 +770,9 @@ public class StandaloneHerderTest { List<String> errors = new ArrayList<>(singletonList(error)); String key = "foo.invalid.key"; when(connectorMock.validate(config)).thenReturn( - new Config( - Arrays.asList(new ConfigValue(key, null, Collections.emptyList(), errors)) - ) + new Config( + singletonList(new ConfigValue(key, null, Collections.emptyList(), errors)) + ) ); ConfigDef configDef = new ConfigDef(); configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ""); @@ -853,7 +789,7 @@ public class StandaloneHerderTest { ExecutionException e = assertThrows( "Should have failed to configure connector", ExecutionException.class, - () -> createCallback.get(1000L, TimeUnit.SECONDS) + () -> createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS) ); assertNotNull(e.getCause()); Throwable cause = e.getCause(); @@ -869,12 +805,10 @@ public class StandaloneHerderTest { @Test public void testTargetStates() throws Exception { - connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, connectorConfig); + expectConfigValidation(SourceSink.SOURCE, connectorConfig); // We pause, then stop, the connector expectTargetState(CONNECTOR_NAME, TargetState.PAUSED); @@ -887,14 +821,14 @@ public class StandaloneHerderTest { FutureCallback<List<TaskInfo>> taskConfigsCallback = new FutureCallback<>(); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); herder.pauseConnector(CONNECTOR_NAME); herder.stopConnector(CONNECTOR_NAME, stopCallback); verify(statusBackingStore).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); - stopCallback.get(10L, TimeUnit.SECONDS); + stopCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); herder.taskConfigs(CONNECTOR_NAME, taskConfigsCallback); - assertEquals(Collections.emptyList(), taskConfigsCallback.get(1, TimeUnit.SECONDS)); + assertEquals(Collections.emptyList(), taskConfigsCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked herder.stop(); @@ -910,12 +844,12 @@ public class StandaloneHerderTest { herder.alterConnectorOffsets("unknown-connector", Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), alterOffsetsCallback); - ExecutionException e = assertThrows(ExecutionException.class, () -> alterOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); + ExecutionException e = assertThrows(ExecutionException.class, () -> alterOffsetsCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); assertInstanceOf(NotFoundException.class, e.getCause()); FutureCallback<Message> resetOffsetsCallback = new FutureCallback<>(); herder.resetConnectorOffsets("unknown-connector", resetOffsetsCallback); - e = assertThrows(ExecutionException.class, () -> resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); + e = assertThrows(ExecutionException.class, () -> resetOffsetsCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); assertInstanceOf(NotFoundException.class, e.getCause()); } @@ -938,12 +872,12 @@ public class StandaloneHerderTest { herder.alterConnectorOffsets(CONNECTOR_NAME, Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), alterOffsetsCallback); - ExecutionException e = assertThrows(ExecutionException.class, () -> alterOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); + ExecutionException e = assertThrows(ExecutionException.class, () -> alterOffsetsCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); assertInstanceOf(BadRequestException.class, e.getCause()); FutureCallback<Message> resetOffsetsCallback = new FutureCallback<>(); herder.resetConnectorOffsets(CONNECTOR_NAME, resetOffsetsCallback); - e = assertThrows(ExecutionException.class, () -> resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); + e = assertThrows(ExecutionException.class, () -> resetOffsetsCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS)); assertInstanceOf(BadRequestException.class, e.getCause()); } @@ -1004,33 +938,25 @@ public class StandaloneHerderTest { @Test() public void testRequestTaskReconfigurationDoesNotDeadlock() throws Exception { - connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); // Start the connector Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, config); - + // Prepare for connector and task config update + Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE); + newConfig.put("dummy-connector-property", "yes"); + expectConfigValidation(SourceSink.SOURCE, config, newConfig); + mockStartConnector(newConfig, TargetState.STARTED, TargetState.STARTED, null); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); // Wait on connector to start - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); // Prepare for task config update when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); expectStop(); - // Prepare for connector and task config update - Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE); - newConfig.put("dummy-connector-property", "yes"); - final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); - doAnswer(invocation -> { - onStart.getValue().onCompletion(null, TargetState.STARTED); - return true; - }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(newConfig), any(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), onStart.capture()); // Common invocations when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); @@ -1045,15 +971,14 @@ public class StandaloneHerderTest { // Set new config on the connector and tasks FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = new FutureCallback<>(); - expectConfigValidation(connectorMock, false, newConfig); herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, reconfigureCallback); // Reconfigure the tasks herder.requestTaskReconfiguration(CONNECTOR_NAME); // Wait on connector update - Herder.Created<ConnectorInfo> updatedConnectorInfo = reconfigureCallback.get(1000L, TimeUnit.MILLISECONDS); - ConnectorInfo expectedConnectorInfo = new ConnectorInfo(CONNECTOR_NAME, newConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE); + Herder.Created<ConnectorInfo> updatedConnectorInfo = reconfigureCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); + ConnectorInfo expectedConnectorInfo = new ConnectorInfo(CONNECTOR_NAME, newConfig, singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE); assertEquals(expectedConnectorInfo, updatedConnectorInfo.result()); verify(statusBackingStore, times(2)).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0)); @@ -1062,15 +987,10 @@ public class StandaloneHerderTest { private void expectAdd(SourceSink sourceSink) { Map<String, String> connectorProps = connectorConfig(sourceSink); ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ? - new SourceConnectorConfig(plugins, connectorProps, true) : - new SinkConnectorConfig(plugins, connectorProps); + new SourceConnectorConfig(plugins, connectorProps, true) : + new SinkConnectorConfig(plugins, connectorProps); - final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); - doAnswer(invocation -> { - onStart.getValue().onCompletion(null, TargetState.STARTED); - return true; - }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(connectorProps), any(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), onStart.capture()); + mockStartConnector(connectorProps, TargetState.STARTED, TargetState.STARTED, null); when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true); if (sourceSink == SourceSink.SOURCE) { when(worker.isTopicCreationEnabled()).thenReturn(true); @@ -1081,7 +1001,7 @@ public class StandaloneHerderTest { Map<String, String> generatedTaskProps = taskConfig(sourceSink); when(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig)) - .thenReturn(singletonList(generatedTaskProps)); + .thenReturn(singletonList(generatedTaskProps)); ClusterConfigState configState = new ClusterConfigState( -1, @@ -1127,8 +1047,8 @@ public class StandaloneHerderTest { private ConnectorInfo createdInfo(SourceSink sourceSink) { return new ConnectorInfo(CONNECTOR_NAME, connectorConfig(sourceSink), - Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), - SourceSink.SOURCE == sourceSink ? ConnectorType.SOURCE : ConnectorType.SINK); + singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)), + SourceSink.SOURCE == sourceSink ? ConnectorType.SOURCE : ConnectorType.SINK); } private void expectStop() { @@ -1168,38 +1088,40 @@ public class StandaloneHerderTest { } private void expectConfigValidation( - Connector connectorMock, - boolean shouldCreateConnector, + SourceSink sourceSink, Map<String, String>... configs ) { // config validation + Connector connectorMock = sourceSink == SourceSink.SOURCE ? mock(SourceConnector.class) : mock(SinkConnector.class); when(worker.configTransformer()).thenReturn(transformer); final ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); when(worker.getPlugins()).thenReturn(plugins); when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader); when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap); - if (shouldCreateConnector) { - when(worker.getPlugins()).thenReturn(plugins); - when(plugins.newConnector(anyString())).thenReturn(connectorMock); - } + + // Assume the connector should always be created + when(worker.getPlugins()).thenReturn(plugins); + when(plugins.newConnector(anyString())).thenReturn(connectorMock); when(connectorMock.config()).thenReturn(new ConfigDef()); - for (Map<String, String> config : configs) + // Set up validation for each config + for (Map<String, String> config : configs) { when(connectorMock.validate(config)).thenReturn(new Config(Collections.emptyList())); + } } // We need to use a real class here due to some issue with mocking java.lang.Class - private abstract class BogusSourceConnector extends SourceConnector { + private static abstract class BogusSourceConnector extends SourceConnector { } - private abstract class BogusSourceTask extends SourceTask { + private static abstract class BogusSourceTask extends SourceTask { } - private abstract class BogusSinkConnector extends SinkConnector { + private static abstract class BogusSinkConnector extends SinkConnector { } - private abstract class BogusSinkTask extends SourceTask { + private static abstract class BogusSinkTask extends SourceTask { } private void verifyConnectorStatusRestart() { @@ -1208,4 +1130,14 @@ public class StandaloneHerderTest { assertEquals(CONNECTOR_NAME, connectorStatus.getValue().id()); assertEquals(AbstractStatus.State.RESTARTING, connectorStatus.getValue().state()); } + + private void mockStartConnector(Map<String, String> config, TargetState result, TargetState targetState, Exception exception) { + final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { + onStart.getValue().onCompletion(exception, result); + return true; + }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), + any(HerderConnectorContext.class), + eq(herder), eq(targetState), onStart.capture()); + } }