This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 964e73178b5 KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest (#12728) 964e73178b5 is described below commit 964e73178b5b8363cd2685ce6872905ef0c04dee Author: Matthew de Detrich <matthew.dedetr...@aiven.io> AuthorDate: Thu Dec 7 16:01:17 2023 +0100 KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest (#12728) Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Chris Egerton <chr...@aiven.io> --- build.gradle | 1 - .../runtime/standalone/StandaloneHerderTest.java | 779 +++++++++------------ 2 files changed, 321 insertions(+), 459 deletions(-) diff --git a/build.gradle b/build.gradle index 080daf28ac9..f5e8283cdc6 100644 --- a/build.gradle +++ b/build.gradle @@ -419,7 +419,6 @@ subprojects { testsToExclude.addAll([ // connect tests "**/KafkaConfigBackingStoreTest.*", - "**/StandaloneHerderTest.*", "**/WorkerSinkTaskTest.*" ]) } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 7e4126aa141..213f029a0b2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -39,7 +39,6 @@ import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfigTransformer; -import org.apache.kafka.connect.runtime.WorkerConnector; import org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.runtime.rest.entities.Message; @@ -60,15 +59,13 @@ import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; -import org.easymock.Capture; -import org.easymock.EasyMock; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; import java.util.ArrayList; import java.util.Arrays; @@ -88,19 +85,30 @@ import static java.util.Collections.singletonMap; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.isNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - -@RunWith(PowerMockRunner.class) +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) @SuppressWarnings("unchecked") -@PrepareForTest({StandaloneHerder.class, WorkerConnector.class}) public class StandaloneHerderTest { private static final String CONNECTOR_NAME = "test"; private static final String TOPICS_LIST_STR = "topic1,topic2"; @@ -114,7 +122,8 @@ public class StandaloneHerderTest { private StandaloneHerder herder; private Connector connector; - @Mock protected Worker worker; + @Mock + protected Worker worker; @Mock protected WorkerConfigTransformer transformer; @Mock private Plugins plugins; @Mock @@ -127,63 +136,59 @@ public class StandaloneHerderTest { noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy(); @Before - public void setup() { - worker = PowerMock.createMock(Worker.class); - String[] methodNames = new String[]{"connectorType", "buildRestartPlan", "recordRestarting"}; - herder = PowerMock.createPartialMock(StandaloneHerder.class, methodNames, - worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, new MockTime()); + public void setup() throws ExecutionException, InterruptedException { + worker = mock(Worker.class); + herder = mock(StandaloneHerder.class, withSettings() + .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, new MockTime()) + .defaultAnswer(CALLS_REAL_METHODS)); createCallback = new FutureCallback<>(); - plugins = PowerMock.createMock(Plugins.class); - pluginLoader = PowerMock.createMock(PluginClassLoader.class); - loaderSwap = PowerMock.createMock(LoaderSwap.class); - PowerMock.mockStatic(WorkerConnector.class); - Capture<Map<String, String>> configCapture = Capture.newInstance(); - EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes(); + plugins = mock(Plugins.class); + pluginLoader = mock(PluginClassLoader.class); + loaderSwap = mock(LoaderSwap.class); + final ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); + when(transformer.transform(eq(CONNECTOR_NAME), configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); + } + + @After + public void tearDown() { + verifyNoMoreInteractions(worker, statusBackingStore); } @Test public void testCreateSourceConnector() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); - PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); - - PowerMock.verifyAll(); } @Test public void testCreateConnectorFailedValidation() { // Basic validation should be performed and return an error, but should still evaluate the connector's config - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); Map<String, String> config = connectorConfig(SourceSink.SOURCE); config.remove(ConnectorConfig.NAME_CONFIG); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); - EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); - EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); - EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); - EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader); - EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap); + Connector connectorMock = mock(SourceConnector.class); + when(worker.configTransformer()).thenReturn(transformer); + final ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); + when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); + when(worker.getPlugins()).thenReturn(plugins); + when(plugins.newConnector(anyString())).thenReturn(connectorMock); + when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader); + when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap); - EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef()); + when(connectorMock.config()).thenReturn(new ConfigDef()); ConfigValue validatedValue = new ConfigValue("foo.bar"); - EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue))); - loaderSwap.close(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); + when(connectorMock.validate(config)).thenReturn(new Config(singletonList(validatedValue))); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); @@ -191,82 +196,66 @@ public class StandaloneHerderTest { if (BadRequestException.class != exception.getCause().getClass()) { throw new AssertionError(exception.getCause()); } - PowerMock.verifyAll(); + verify(loaderSwap).close(); } @Test public void testCreateConnectorAlreadyExists() throws Throwable { - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); // First addition should succeed expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config, config); - EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); - EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); - EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader); - EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap); - // No new connector is created - loaderSwap.close(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); // Second should fail FutureCallback<Herder.Created<ConnectorInfo>> failedCreateCallback = new FutureCallback<>(); + // No new connector is created herder.putConnectorConfig(CONNECTOR_NAME, config, false, failedCreateCallback); ExecutionException exception = assertThrows(ExecutionException.class, () -> failedCreateCallback.get(1000L, TimeUnit.SECONDS)); if (AlreadyExistsException.class != exception.getCause().getClass()) { throw new AssertionError(exception.getCause()); } - PowerMock.verifyAll(); + verify(loaderSwap, times(2)).close(); } @Test public void testCreateSinkConnector() throws Exception { - connector = PowerMock.createMock(BogusSinkConnector.class); + connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> config = connectorConfig(SourceSink.SINK); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, true, config); - PowerMock.replayAll(); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); - - PowerMock.verifyAll(); } @Test public void testCreateConnectorWithStoppedInitialState() throws Exception { - connector = PowerMock.createMock(BogusSinkConnector.class); + connector = mock(BogusSinkConnector.class); Map<String, String> config = connectorConfig(SourceSink.SINK); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, false, config); - EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); + when(plugins.newConnector(anyString())).thenReturn(connectorMock); // Only the connector should be created; we expect no tasks to be spawned for a connector created with a paused or stopped initial state - Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); - worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class), - eq(herder), eq(TargetState.STOPPED), EasyMock.capture(onStart)); - EasyMock.expectLastCall().andAnswer(() -> { + final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { onStart.getValue().onCompletion(null, TargetState.STOPPED); return true; - }); - EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true).anyTimes(); - EasyMock.expect(herder.connectorType(anyObject())).andReturn(ConnectorType.SINK); + }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), any(HerderConnectorContext.class), + eq(herder), eq(TargetState.STOPPED), onStart.capture()); - PowerMock.replayAll(); + when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true); + when(herder.connectorType(any())).thenReturn(ConnectorType.SINK); herder.putConnectorConfig(CONNECTOR_NAME, config, TargetState.STOPPED, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); @@ -274,46 +263,41 @@ public class StandaloneHerderTest { new ConnectorInfo(CONNECTOR_NAME, connectorConfig(SourceSink.SINK), Collections.emptyList(), ConnectorType.SINK), connectorInfo.result() ); - - PowerMock.verifyAll(); + verify(loaderSwap).close(); } @Test public void testDestroyConnector() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); - EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.emptyList()); - statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0)); - statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0)); - - expectDestroy(); - - PowerMock.replayAll(); + when(statusBackingStore.getAll(CONNECTOR_NAME)).thenReturn(Collections.emptyList()); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); FutureCallback<Herder.Created<ConnectorInfo>> deleteCallback = new FutureCallback<>(); + expectDestroy(); herder.deleteConnectorConfig(CONNECTOR_NAME, deleteCallback); + verify(herder).onDeletion(CONNECTOR_NAME); + verify(statusBackingStore).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0)); + verify(statusBackingStore).put(new ConnectorStatus(CONNECTOR_NAME, ConnectorStatus.State.DESTROYED, WORKER_ID, 0)); deleteCallback.get(1000L, TimeUnit.MILLISECONDS); // Second deletion should fail since the connector is gone FutureCallback<Herder.Created<ConnectorInfo>> failedDeleteCallback = new FutureCallback<>(); herder.deleteConnectorConfig(CONNECTOR_NAME, failedDeleteCallback); - try { - failedDeleteCallback.get(1000L, TimeUnit.MILLISECONDS); - fail("Should have thrown NotFoundException"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof NotFoundException); - } - - PowerMock.verifyAll(); + ExecutionException e = assertThrows( + "Should have thrown NotFoundException", + ExecutionException.class, + () -> failedDeleteCallback.get(1000L, TimeUnit.MILLISECONDS) + ); + assertTrue(e.getCause() instanceof NotFoundException); } @Test @@ -321,26 +305,21 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); - worker.stopAndAwaitConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); - - Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); - worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); - EasyMock.expectLastCall().andAnswer(() -> { + final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { onStart.getValue().onCompletion(null, TargetState.STARTED); return true; - }); + }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), any(HerderConnectorContext.class), + eq(herder), eq(TargetState.STARTED), onStart.capture()); - EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME)); - EasyMock.expect(worker.getPlugins()).andReturn(plugins); + when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); + when(worker.getPlugins()).thenReturn(plugins); // same task configs as earlier, so don't expect a new set of tasks to be brought up - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))).andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE))); - - PowerMock.replayAll(); + when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))) + .thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE))); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); @@ -349,8 +328,7 @@ public class StandaloneHerderTest { FutureCallback<Void> restartCallback = new FutureCallback<>(); herder.restartConnector(CONNECTOR_NAME, restartCallback); restartCallback.get(1000L, TimeUnit.MILLISECONDS); - - PowerMock.verifyAll(); + verify(worker).stopAndAwaitConnector(eq(CONNECTOR_NAME)); } @Test @@ -359,45 +337,35 @@ public class StandaloneHerderTest { Map<String, String> config = connectorConfig(SourceSink.SOURCE); ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); - worker.stopAndAwaitConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); + herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); - Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); - worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); - EasyMock.expectLastCall().andAnswer(() -> { + final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { onStart.getValue().onCompletion(null, TargetState.STARTED); return true; - }); + }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), any(HerderConnectorContext.class), + eq(herder), eq(TargetState.STARTED), onStart.capture()); - EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME)); - EasyMock.expect(worker.getPlugins()).andReturn(plugins); + when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); + when(worker.getPlugins()).thenReturn(plugins); // changed task configs, expect a new set of tasks to be brought up (and the old ones to be stopped) Map<String, String> taskConfigs = taskConfig(SourceSink.SOURCE); taskConfigs.put("k", "v"); - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))).andReturn(Collections.singletonList(taskConfigs)); + when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))) + .thenReturn(Collections.singletonList(taskConfigs)); - worker.stopAndAwaitTasks(Collections.singletonList(taskId)); - EasyMock.expectLastCall(); - statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); - EasyMock.expectLastCall(); - worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), EasyMock.anyObject(), eq(connectorConfig(SourceSink.SOURCE)), eq(taskConfigs), eq(herder), eq(TargetState.STARTED)); - EasyMock.expectLastCall().andReturn(true); - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); - assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); + when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(taskConfigs), eq(herder), eq(TargetState.STARTED))).thenReturn(true); FutureCallback<Void> restartCallback = new FutureCallback<>(); + expectStop(); herder.restartConnector(CONNECTOR_NAME, restartCallback); + verify(statusBackingStore).put(new TaskStatus(taskId, TaskStatus.State.DESTROYED, WORKER_ID, 0)); restartCallback.get(1000L, TimeUnit.MILLISECONDS); - - PowerMock.verifyAll(); } @Test @@ -405,28 +373,25 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); - worker.stopAndAwaitConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); + doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); - Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); - worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); - Exception exception = new ConnectException("Failed to start connector"); - EasyMock.expectLastCall().andAnswer(() -> { - onStart.getValue().onCompletion(exception, null); - return true; - }); + final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); - PowerMock.replayAll(); + Exception exception = new ConnectException("Failed to start connector"); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); FutureCallback<Void> restartCallback = new FutureCallback<>(); + doAnswer(invocation -> { + onStart.getValue().onCompletion(exception, null); + return true; + }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), any(HerderConnectorContext.class), + eq(herder), eq(TargetState.STARTED), onStart.capture()); herder.restartConnector(CONNECTOR_NAME, restartCallback); try { restartCallback.get(1000L, TimeUnit.MILLISECONDS); @@ -434,8 +399,6 @@ public class StandaloneHerderTest { } catch (ExecutionException e) { assertEquals(exception, e.getCause()); } - - PowerMock.verifyAll(); } @Test @@ -444,11 +407,10 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); - worker.stopAndAwaitTask(taskId); - EasyMock.expectLastCall(); + doNothing().when(worker).stopAndAwaitTask(taskId); ClusterConfigState configState = new ClusterConfigState( -1, @@ -462,10 +424,8 @@ public class StandaloneHerderTest { new HashSet<>(), new HashSet<>(), transformer); - worker.startSourceTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); - EasyMock.expectLastCall().andReturn(true); - - PowerMock.replayAll(); + when(worker.startSourceTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED)) + .thenReturn(true); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); @@ -475,8 +435,6 @@ public class StandaloneHerderTest { FutureCallback<Void> restartTaskCallback = new FutureCallback<>(); herder.restartTask(taskId, restartTaskCallback); restartTaskCallback.get(1000L, TimeUnit.MILLISECONDS); - - PowerMock.verifyAll(); } @Test @@ -485,12 +443,9 @@ public class StandaloneHerderTest { expectAdd(SourceSink.SOURCE); Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); - worker.stopAndAwaitTask(taskId); - EasyMock.expectLastCall(); - ClusterConfigState configState = new ClusterConfigState( -1, null, @@ -503,10 +458,8 @@ public class StandaloneHerderTest { new HashSet<>(), new HashSet<>(), transformer); - worker.startSourceTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); - EasyMock.expectLastCall().andReturn(false); - - PowerMock.replayAll(); + when(worker.startSourceTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED)) + .thenReturn(false); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS); @@ -514,41 +467,35 @@ public class StandaloneHerderTest { FutureCallback<Void> cb = new FutureCallback<>(); herder.restartTask(taskId, cb); + verify(worker).stopAndAwaitTask(taskId); try { cb.get(1000L, TimeUnit.MILLISECONDS); fail("Expected restart callback to raise an exception"); } catch (ExecutionException exception) { assertEquals(ConnectException.class, exception.getCause().getClass()); } - - PowerMock.verifyAll(); } @Test public void testRestartConnectorAndTasksUnknownConnector() { - PowerMock.replayAll(); - FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); RestartRequest restartRequest = new RestartRequest("UnknownConnector", false, true); herder.restartConnectorAndTasks(restartRequest, restartCallback); ExecutionException ee = assertThrows(ExecutionException.class, () -> restartCallback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(ee.getCause() instanceof NotFoundException); - - PowerMock.verifyAll(); } @Test public void testRestartConnectorAndTasksNoStatus() throws Exception { RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); - EasyMock.expect(herder.buildRestartPlan(restartRequest)).andReturn(Optional.empty()).anyTimes(); + doReturn(Optional.empty()).when(herder).buildRestartPlan(restartRequest); - connector = PowerMock.createMock(BogusSinkConnector.class); + connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); - PowerMock.replayAll(); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); @@ -559,29 +506,25 @@ public class StandaloneHerderTest { ExecutionException ee = assertThrows(ExecutionException.class, () -> restartCallback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(ee.getCause() instanceof NotFoundException); assertTrue(ee.getMessage().contains("Status for connector")); - PowerMock.verifyAll(); } @Test public void testRestartConnectorAndTasksNoRestarts() throws Exception { RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); - RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class); - ConnectorStateInfo connectorStateInfo = PowerMock.createMock(ConnectorStateInfo.class); - EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes(); - EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes(); - EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes(); - EasyMock.expect(herder.buildRestartPlan(restartRequest)) - .andReturn(Optional.of(restartPlan)).anyTimes(); - - connector = PowerMock.createMock(BogusSinkConnector.class); + RestartPlan restartPlan = mock(RestartPlan.class); + ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class); + when(restartPlan.shouldRestartConnector()).thenReturn(false); + when(restartPlan.shouldRestartTasks()).thenReturn(false); + when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); + doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); + + connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); - PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); @@ -589,42 +532,33 @@ public class StandaloneHerderTest { FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); - PowerMock.verifyAll(); } @Test public void testRestartConnectorAndTasksOnlyConnector() throws Exception { RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); - RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class); - ConnectorStateInfo connectorStateInfo = PowerMock.createMock(ConnectorStateInfo.class); - EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes(); - EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes(); - EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes(); - EasyMock.expect(herder.buildRestartPlan(restartRequest)) - .andReturn(Optional.of(restartPlan)).anyTimes(); - - herder.onRestart(CONNECTOR_NAME); - EasyMock.expectLastCall(); - - connector = PowerMock.createMock(BogusSinkConnector.class); + RestartPlan restartPlan = mock(RestartPlan.class); + ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class); + when(restartPlan.shouldRestartConnector()).thenReturn(true); + when(restartPlan.shouldRestartTasks()).thenReturn(false); + when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); + doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); + + connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); - worker.stopAndAwaitConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); + doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); - Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); - worker.startConnector(eq(CONNECTOR_NAME), eq(connectorConfig), EasyMock.anyObject(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); - EasyMock.expectLastCall().andAnswer(() -> { + final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { onStart.getValue().onCompletion(null, TargetState.STARTED); return true; - }); - - PowerMock.replayAll(); + }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(connectorConfig), any(HerderConnectorContext.class), + eq(herder), eq(TargetState.STARTED), onStart.capture()); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); @@ -633,36 +567,32 @@ public class StandaloneHerderTest { FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); - PowerMock.verifyAll(); + + verifyConnectorStatusRestart(); } @Test public void testRestartConnectorAndTasksOnlyTasks() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); - RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class); - ConnectorStateInfo connectorStateInfo = PowerMock.createMock(ConnectorStateInfo.class); - EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes(); - EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes(); - EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes(); - EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes(); - EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes(); - EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes(); - EasyMock.expect(herder.buildRestartPlan(restartRequest)) - .andReturn(Optional.of(restartPlan)).anyTimes(); - - herder.onRestart(taskId); - EasyMock.expectLastCall(); - - connector = PowerMock.createMock(BogusSinkConnector.class); + RestartPlan restartPlan = mock(RestartPlan.class); + ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class); + when(restartPlan.shouldRestartConnector()).thenReturn(false); + when(restartPlan.shouldRestartTasks()).thenReturn(true); + when(restartPlan.restartTaskCount()).thenReturn(1); + when(restartPlan.totalTaskCount()).thenReturn(1); + when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(taskId)); + when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); + doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); + + connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); - worker.stopAndAwaitTasks(Collections.singletonList(taskId)); - EasyMock.expectLastCall(); + doNothing().when(worker).stopAndAwaitTasks(Collections.singletonList(taskId)); ClusterConfigState configState = new ClusterConfigState( -1, @@ -676,9 +606,8 @@ public class StandaloneHerderTest { new HashSet<>(), new HashSet<>(), transformer); - worker.startSinkTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SINK), herder, TargetState.STARTED); - EasyMock.expectLastCall().andReturn(true); - PowerMock.replayAll(); + when(worker.startSinkTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SINK), herder, TargetState.STARTED)) + .thenReturn(true); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); @@ -687,48 +616,44 @@ public class StandaloneHerderTest { FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); - PowerMock.verifyAll(); + ArgumentCaptor<TaskStatus> taskStatus = ArgumentCaptor.forClass(TaskStatus.class); + verify(statusBackingStore).put(taskStatus.capture()); + assertEquals(AbstractStatus.State.RESTARTING, taskStatus.getValue().state()); + assertEquals(taskId, taskStatus.getValue().id()); } @Test public void testRestartConnectorAndTasksBoth() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); - RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class); - ConnectorStateInfo connectorStateInfo = PowerMock.createMock(ConnectorStateInfo.class); - EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes(); - EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes(); - EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes(); - EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes(); - EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes(); - EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes(); - EasyMock.expect(herder.buildRestartPlan(restartRequest)) - .andReturn(Optional.of(restartPlan)).anyTimes(); - - herder.onRestart(CONNECTOR_NAME); - EasyMock.expectLastCall(); - herder.onRestart(taskId); - EasyMock.expectLastCall(); - - connector = PowerMock.createMock(BogusSinkConnector.class); + RestartPlan restartPlan = mock(RestartPlan.class); + ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class); + when(restartPlan.shouldRestartConnector()).thenReturn(true); + when(restartPlan.shouldRestartTasks()).thenReturn(true); + when(restartPlan.restartTaskCount()).thenReturn(1); + when(restartPlan.totalTaskCount()).thenReturn(1); + when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(taskId)); + when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo); + doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); + + ArgumentCaptor<TaskStatus> taskStatus = ArgumentCaptor.forClass(TaskStatus.class); + + connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); - worker.stopAndAwaitConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); - worker.stopAndAwaitTasks(Collections.singletonList(taskId)); - EasyMock.expectLastCall(); + doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); + doNothing().when(worker).stopAndAwaitTasks(Collections.singletonList(taskId)); - Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); - worker.startConnector(eq(CONNECTOR_NAME), eq(connectorConfig), EasyMock.anyObject(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); - EasyMock.expectLastCall().andAnswer(() -> { + final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { onStart.getValue().onCompletion(null, TargetState.STARTED); return true; - }); + }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(connectorConfig), any(HerderConnectorContext.class), + eq(herder), eq(TargetState.STARTED), onStart.capture()); ClusterConfigState configState = new ClusterConfigState( -1, @@ -742,9 +667,8 @@ public class StandaloneHerderTest { new HashSet<>(), new HashSet<>(), transformer); - worker.startSinkTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SINK), herder, TargetState.STARTED); - EasyMock.expectLastCall().andReturn(true); - PowerMock.replayAll(); + when(worker.startSinkTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SINK), herder, TargetState.STARTED)) + .thenReturn(true); herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); @@ -753,39 +677,34 @@ public class StandaloneHerderTest { FutureCallback<ConnectorStateInfo> restartCallback = new FutureCallback<>(); herder.restartConnectorAndTasks(restartRequest, restartCallback); assertEquals(connectorStateInfo, restartCallback.get(1000L, TimeUnit.MILLISECONDS)); - PowerMock.verifyAll(); + + verifyConnectorStatusRestart(); + verify(statusBackingStore).put(taskStatus.capture()); + assertEquals(AbstractStatus.State.RESTARTING, taskStatus.getValue().state()); + assertEquals(taskId, taskStatus.getValue().id()); } @Test public void testCreateAndStop() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); - // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked expectStop(); - statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); - EasyMock.expectLastCall(); - - statusBackingStore.stop(); - EasyMock.expectLastCall(); - worker.stop(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); + // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked herder.stop(); assertTrue(noneConnectorClientConfigOverridePolicy.isClosed()); - - PowerMock.verifyAll(); + verify(worker).stop(); + verify(statusBackingStore).stop(); + verify(statusBackingStore).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); } @Test @@ -793,49 +712,36 @@ public class StandaloneHerderTest { Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE); System.out.println(connConfig); - Callback<Collection<String>> listConnectorsCb = PowerMock.createMock(Callback.class); - Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class); - Callback<Map<String, String>> connectorConfigCb = PowerMock.createMock(Callback.class); - Callback<List<TaskInfo>> taskConfigsCb = PowerMock.createMock(Callback.class); - Callback<Map<ConnectorTaskId, Map<String, String>>> tasksConfigCb = PowerMock.createMock(Callback.class); + Callback<Collection<String>> listConnectorsCb = mock(Callback.class); + Callback<ConnectorInfo> connectorInfoCb = mock(Callback.class); + Callback<Map<String, String>> connectorConfigCb = mock(Callback.class); + Callback<List<TaskInfo>> taskConfigsCb = mock(Callback.class); + Callback<Map<ConnectorTaskId, Map<String, String>>> tasksConfigCb = mock(Callback.class); // Check accessors with empty worker - listConnectorsCb.onCompletion(null, Collections.EMPTY_SET); - EasyMock.expectLastCall(); - connectorInfoCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.isNull()); - EasyMock.expectLastCall(); - connectorConfigCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.isNull()); - EasyMock.expectLastCall(); - taskConfigsCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.isNull()); - EasyMock.expectLastCall(); - tasksConfigCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.isNull()); - EasyMock.expectLastCall(); + doNothing().when(listConnectorsCb).onCompletion(null, Collections.EMPTY_SET); + doNothing().when(connectorInfoCb).onCompletion(any(NotFoundException.class), isNull()); + doNothing().when(taskConfigsCb).onCompletion(any(NotFoundException.class), isNull()); + doNothing().when(tasksConfigCb).onCompletion(any(NotFoundException.class), isNull()); + doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), isNull()); // Create connector - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); expectConfigValidation(connector, true, connConfig); // Validate accessors with 1 connector - listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME)); - EasyMock.expectLastCall(); + doNothing().when(listConnectorsCb).onCompletion(null, singleton(CONNECTOR_NAME)); ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE); - connectorInfoCb.onCompletion(null, connInfo); - EasyMock.expectLastCall(); - connectorConfigCb.onCompletion(null, connConfig); - EasyMock.expectLastCall(); + doNothing().when(connectorInfoCb).onCompletion(null, connInfo); TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)); - taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo)); - EasyMock.expectLastCall(); + doNothing().when(taskConfigsCb).onCompletion(null, Arrays.asList(taskInfo)); Map<ConnectorTaskId, Map<String, String>> tasksConfig = Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)); - tasksConfigCb.onCompletion(null, tasksConfig); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); + doNothing().when(tasksConfigCb).onCompletion(null, tasksConfig); // All operations are synchronous for StandaloneHerder, so we don't need to actually wait after making each call herder.connectors(listConnectorsCb); @@ -848,19 +754,14 @@ public class StandaloneHerderTest { Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); - EasyMock.reset(transformer); - EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), EasyMock.anyObject())) - .andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info")) - .anyTimes(); - EasyMock.replay(transformer); - + reset(transformer); herder.connectors(listConnectorsCb); herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb); - - PowerMock.verifyAll(); + // Config transformation should not occur when requesting connector or task info + verify(transformer, never()).transform(eq(CONNECTOR_NAME), any()); } @Test @@ -869,39 +770,30 @@ public class StandaloneHerderTest { Map<String, String> newConnConfig = new HashMap<>(connConfig); newConnConfig.put("foo", "bar"); - Callback<Map<String, String>> connectorConfigCb = PowerMock.createMock(Callback.class); - // Callback<Herder.Created<ConnectorInfo>> putConnectorConfigCb = PowerMock.createMock(Callback.class); + Callback<Map<String, String>> connectorConfigCb = mock(Callback.class); // Create - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, connConfig); // Should get first config - connectorConfigCb.onCompletion(null, connConfig); - EasyMock.expectLastCall(); + doNothing().when(connectorConfigCb).onCompletion(null, connConfig); // Update config, which requires stopping and restarting - worker.stopAndAwaitConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); - Capture<Map<String, String>> capturedConfig = EasyMock.newCapture(); - Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); - worker.startConnector(eq(CONNECTOR_NAME), EasyMock.capture(capturedConfig), EasyMock.anyObject(), - eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); - EasyMock.expectLastCall().andAnswer(() -> { + doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); + final ArgumentCaptor<Map<String, String>> capturedConfig = ArgumentCaptor.forClass(Map.class); + final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { onStart.getValue().onCompletion(null, TargetState.STARTED); return true; - }); + }).when(worker).startConnector(eq(CONNECTOR_NAME), capturedConfig.capture(), any(), + eq(herder), eq(TargetState.STARTED), onStart.capture()); // Generate same task config, which should result in no additional action to restart tasks - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true))) - .andReturn(singletonList(taskConfig(SourceSink.SOURCE))); + when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true))) + .thenReturn(singletonList(taskConfig(SourceSink.SOURCE))); expectConfigValidation(connectorMock, false, newConnConfig); - connectorConfigCb.onCompletion(null, newConnConfig); - EasyMock.expectLastCall(); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); - - PowerMock.replayAll(); herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); @@ -910,6 +802,7 @@ public class StandaloneHerderTest { herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = new FutureCallback<>(); + doNothing().when(connectorConfigCb).onCompletion(null, newConnConfig); herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, reconfigureCallback); Herder.Created<ConnectorInfo> newConnectorInfo = reconfigureCallback.get(1000L, TimeUnit.SECONDS); ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), @@ -918,96 +811,76 @@ public class StandaloneHerderTest { assertEquals("bar", capturedConfig.getValue().get("foo")); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); - - PowerMock.verifyAll(); + verifyNoMoreInteractions(connectorConfigCb); } @Test public void testPutTaskConfigs() { - Callback<Void> cb = PowerMock.createMock(Callback.class); - - PowerMock.replayAll(); + Callback<Void> cb = mock(Callback.class); assertThrows(UnsupportedOperationException.class, () -> herder.putTaskConfigs(CONNECTOR_NAME, singletonList(singletonMap("config", "value")), cb, null)); - PowerMock.verifyAll(); } @Test - public void testCorruptConfig() throws Throwable { + public void testCorruptConfig() { Map<String, String> config = new HashMap<>(); config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSinkConnector.class.getName()); config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + Connector connectorMock = mock(SinkConnector.class); String error = "This is an error in your config!"; List<String> errors = new ArrayList<>(singletonList(error)); String key = "foo.invalid.key"; - EasyMock.expect(connectorMock.validate(config)).andReturn( + when(connectorMock.validate(config)).thenReturn( new Config( Arrays.asList(new ConfigValue(key, null, Collections.emptyList(), errors)) ) ); ConfigDef configDef = new ConfigDef(); configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ""); - EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); - EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); - EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader); - EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap); - EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); - EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); - EasyMock.expect(connectorMock.config()).andStubReturn(configDef); - loaderSwap.close(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); + when(worker.configTransformer()).thenReturn(transformer); + final ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); + when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); + when(worker.getPlugins()).thenReturn(plugins); + when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader); + when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap); + when(plugins.newConnector(anyString())).thenReturn(connectorMock); + when(connectorMock.config()).thenReturn(configDef); herder.putConnectorConfig(CONNECTOR_NAME, config, true, createCallback); - try { - createCallback.get(1000L, TimeUnit.SECONDS); - fail("Should have failed to configure connector"); - } catch (ExecutionException e) { - assertNotNull(e.getCause()); - Throwable cause = e.getCause(); - assertTrue(cause instanceof BadRequestException); - assertEquals( + ExecutionException e = assertThrows( + "Should have failed to configure connector", + ExecutionException.class, + () -> createCallback.get(1000L, TimeUnit.SECONDS) + ); + assertNotNull(e.getCause()); + Throwable cause = e.getCause(); + assertTrue(cause instanceof BadRequestException); + assertEquals( cause.getMessage(), "Connector configuration is invalid and contains the following 1 error(s):\n" + - error + "\n" + - "You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`" - ); - } - - PowerMock.verifyAll(); + error + "\n" + + "You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`" + ); + verify(loaderSwap).close(); } @Test public void testTargetStates() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, connectorConfig); // We pause, then stop, the connector expectTargetState(CONNECTOR_NAME, TargetState.PAUSED); expectTargetState(CONNECTOR_NAME, TargetState.STOPPED); - // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked expectStop(); - statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); - EasyMock.expectLastCall(); - - statusBackingStore.stop(); - EasyMock.expectLastCall(); - worker.stop(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); FutureCallback<Void> stopCallback = new FutureCallback<>(); FutureCallback<List<TaskInfo>> taskConfigsCallback = new FutureCallback<>(); @@ -1017,20 +890,21 @@ public class StandaloneHerderTest { assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); herder.pauseConnector(CONNECTOR_NAME); herder.stopConnector(CONNECTOR_NAME, stopCallback); + verify(statusBackingStore).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); stopCallback.get(10L, TimeUnit.SECONDS); herder.taskConfigs(CONNECTOR_NAME, taskConfigsCallback); assertEquals(Collections.emptyList(), taskConfigsCallback.get(1, TimeUnit.SECONDS)); + // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked herder.stop(); assertTrue(noneConnectorClientConfigOverridePolicy.isClosed()); - - PowerMock.verifyAll(); + verify(worker).stop(); + verify(statusBackingStore).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0)); + verify(statusBackingStore).stop(); } @Test public void testModifyConnectorOffsetsUnknownConnector() { - PowerMock.replayAll(); - FutureCallback<Message> alterOffsetsCallback = new FutureCallback<>(); herder.alterConnectorOffsets("unknown-connector", Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), @@ -1042,14 +916,10 @@ public class StandaloneHerderTest { herder.resetConnectorOffsets("unknown-connector", resetOffsetsCallback); e = assertThrows(ExecutionException.class, () -> resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(e.getCause() instanceof NotFoundException); - - PowerMock.verifyAll(); } @Test public void testModifyConnectorOffsetsConnectorNotInStoppedState() { - PowerMock.replayAll(); - herder.configState = new ClusterConfigState( 10, null, @@ -1074,20 +944,16 @@ public class StandaloneHerderTest { herder.resetConnectorOffsets(CONNECTOR_NAME, resetOffsetsCallback); e = assertThrows(ExecutionException.class, () -> resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(e.getCause() instanceof BadRequestException); - - PowerMock.verifyAll(); } @Test public void testAlterConnectorOffsets() throws Exception { - Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); + ArgumentCaptor<Callback<Message>> workerCallbackCapture = ArgumentCaptor.forClass(Callback.class); Message msg = new Message("The offsets for this connector have been altered successfully"); - worker.modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), anyObject(Map.class), capture(workerCallbackCapture)); - EasyMock.expectLastCall().andAnswer(() -> { + doAnswer(invocation -> { workerCallbackCapture.getValue().onCompletion(null, msg); return null; - }); - PowerMock.replayAll(); + }).when(worker).modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), any(Map.class), workerCallbackCapture.capture()); herder.configState = new ClusterConfigState( 10, @@ -1106,19 +972,17 @@ public class StandaloneHerderTest { Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), alterOffsetsCallback); assertEquals(msg, alterOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); - PowerMock.verifyAll(); } @Test public void testResetConnectorOffsets() throws Exception { - Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); + ArgumentCaptor<Callback<Message>> workerCallbackCapture = ArgumentCaptor.forClass(Callback.class); Message msg = new Message("The offsets for this connector have been reset successfully"); - worker.modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), isNull(), capture(workerCallbackCapture)); - EasyMock.expectLastCall().andAnswer(() -> { + + doAnswer(invocation -> { workerCallbackCapture.getValue().onCompletion(null, msg); return null; - }); - PowerMock.replayAll(); + }).when(worker).modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), isNull(), workerCallbackCapture.capture()); herder.configState = new ClusterConfigState( 10, @@ -1135,7 +999,6 @@ public class StandaloneHerderTest { FutureCallback<Message> resetOffsetsCallback = new FutureCallback<>(); herder.resetConnectorOffsets(CONNECTOR_NAME, resetOffsetsCallback); assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); - PowerMock.verifyAll(); } private void expectAdd(SourceSink sourceSink) { @@ -1144,24 +1007,23 @@ public class StandaloneHerderTest { new SourceConnectorConfig(plugins, connectorProps, true) : new SinkConnectorConfig(plugins, connectorProps); - Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); - worker.startConnector(eq(CONNECTOR_NAME), eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); - EasyMock.expectLastCall().andAnswer(() -> { + final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { onStart.getValue().onCompletion(null, TargetState.STARTED); return true; - }); - EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true).anyTimes(); + }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(connectorProps), any(HerderConnectorContext.class), + eq(herder), eq(TargetState.STARTED), onStart.capture()); + when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true); if (sourceSink == SourceSink.SOURCE) { - EasyMock.expect(worker.isTopicCreationEnabled()).andReturn(true).anyTimes(); + when(worker.isTopicCreationEnabled()).thenReturn(true); } // And we should instantiate the tasks. For a sink task, we should see added properties for the input topic partitions Map<String, String> generatedTaskProps = taskConfig(sourceSink); - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig)) - .andReturn(singletonList(generatedTaskProps)); + when(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig)) + .thenReturn(singletonList(generatedTaskProps)); ClusterConfigState configState = new ClusterConfigState( -1, @@ -1176,34 +1038,33 @@ public class StandaloneHerderTest { new HashSet<>(), transformer); if (sourceSink.equals(SourceSink.SOURCE)) { - worker.startSourceTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED); + when(worker.startSourceTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED)).thenReturn(true); } else { - worker.startSinkTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED); + when(worker.startSinkTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED)).thenReturn(true); } - EasyMock.expectLastCall().andReturn(true); - - Capture<Map<String, String>> configCapture = EasyMock.newCapture(); - EasyMock.expect(herder.connectorType(EasyMock.capture(configCapture))) - .andStubAnswer(() -> { - String connectorClass = configCapture.getValue().get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - if (BogusSourceConnector.class.getName().equals(connectorClass)) { - return ConnectorType.SOURCE; - } else if (BogusSinkConnector.class.getName().equals(connectorClass)) { - return ConnectorType.SINK; - } - return ConnectorType.UNKNOWN; - }); - worker.isSinkConnector(CONNECTOR_NAME); - PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK).anyTimes(); + + ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); + when(herder.connectorType(configCapture.capture())).thenAnswer(invocation -> { + String connectorClass = configCapture.getValue().get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + if (BogusSourceConnector.class.getName().equals(connectorClass)) { + return ConnectorType.SOURCE; + } else if (BogusSinkConnector.class.getName().equals(connectorClass)) { + return ConnectorType.SINK; + } + return ConnectorType.UNKNOWN; + }); + + when(worker.isSinkConnector(CONNECTOR_NAME)) + .thenReturn(sourceSink == SourceSink.SINK); } private void expectTargetState(String connector, TargetState state) { - Capture<Callback<TargetState>> stateChangeCallback = Capture.newInstance(); - worker.setTargetState(eq(connector), eq(state), capture(stateChangeCallback)); - EasyMock.expectLastCall().andAnswer(() -> { + ArgumentCaptor<Callback<TargetState>> stateChangeCallback = ArgumentCaptor.forClass(Callback.class); + + doAnswer(invocation -> { stateChangeCallback.getValue().onCompletion(null, state); return null; - }); + }).when(worker).setTargetState(eq(connector), eq(state), stateChangeCallback.capture()); } private ConnectorInfo createdInfo(SourceSink sourceSink) { @@ -1214,10 +1075,8 @@ public class StandaloneHerderTest { private void expectStop() { ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0); - worker.stopAndAwaitTasks(singletonList(task)); - EasyMock.expectLastCall(); - worker.stopAndAwaitConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); + doNothing().when(worker).stopAndAwaitTasks(singletonList(task)); + doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); } private void expectDestroy() { @@ -1256,22 +1115,20 @@ public class StandaloneHerderTest { Map<String, String>... configs ) { // config validation - EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); - EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); - EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader); - EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap); + when(worker.configTransformer()).thenReturn(transformer); + final ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); + when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); + when(worker.getPlugins()).thenReturn(plugins); + when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader); + when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap); if (shouldCreateConnector) { - EasyMock.expect(worker.getPlugins()).andReturn(plugins); - EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); + when(worker.getPlugins()).thenReturn(plugins); + when(plugins.newConnector(anyString())).thenReturn(connectorMock); } - EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef()); + when(connectorMock.config()).thenReturn(new ConfigDef()); for (Map<String, String> config : configs) - EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.emptyList())); - loaderSwap.close(); - EasyMock.expectLastCall(); + when(connectorMock.validate(config)).thenReturn(new Config(Collections.emptyList())); } // We need to use a real class here due to some issue with mocking java.lang.Class @@ -1287,4 +1144,10 @@ public class StandaloneHerderTest { private abstract class BogusSinkTask extends SourceTask { } + private void verifyConnectorStatusRestart() { + ArgumentCaptor<ConnectorStatus> connectorStatus = ArgumentCaptor.forClass(ConnectorStatus.class); + verify(statusBackingStore).put(connectorStatus.capture()); + assertEquals(CONNECTOR_NAME, connectorStatus.getValue().id()); + assertEquals(AbstractStatus.State.RESTARTING, connectorStatus.getValue().state()); + } }