This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 964e73178b5 KAFKA-14132; Replace EasyMock with Mockito in 
StandaloneHerderTest (#12728)
964e73178b5 is described below

commit 964e73178b5b8363cd2685ce6872905ef0c04dee
Author: Matthew de Detrich <matthew.dedetr...@aiven.io>
AuthorDate: Thu Dec 7 16:01:17 2023 +0100

    KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest (#12728)
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Chris Egerton 
<chr...@aiven.io>
---
 build.gradle                                       |   1 -
 .../runtime/standalone/StandaloneHerderTest.java   | 779 +++++++++------------
 2 files changed, 321 insertions(+), 459 deletions(-)

diff --git a/build.gradle b/build.gradle
index 080daf28ac9..f5e8283cdc6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -419,7 +419,6 @@ subprojects {
     testsToExclude.addAll([
       // connect tests
       "**/KafkaConfigBackingStoreTest.*",
-      "**/StandaloneHerderTest.*",
       "**/WorkerSinkTaskTest.*"
     ])
   }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 7e4126aa141..213f029a0b2 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -39,7 +39,6 @@ import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
-import org.apache.kafka.connect.runtime.WorkerConnector;
 import 
org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.runtime.rest.entities.Message;
@@ -60,15 +59,13 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -88,19 +85,30 @@ import static java.util.Collections.singletonMap;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.isNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
-@RunWith(PowerMockRunner.class)
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 @SuppressWarnings("unchecked")
-@PrepareForTest({StandaloneHerder.class, WorkerConnector.class})
 public class StandaloneHerderTest {
     private static final String CONNECTOR_NAME = "test";
     private static final String TOPICS_LIST_STR = "topic1,topic2";
@@ -114,7 +122,8 @@ public class StandaloneHerderTest {
     private StandaloneHerder herder;
 
     private Connector connector;
-    @Mock protected Worker worker;
+    @Mock
+    protected Worker worker;
     @Mock protected WorkerConfigTransformer transformer;
     @Mock private Plugins plugins;
     @Mock
@@ -127,63 +136,59 @@ public class StandaloneHerderTest {
         noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
 
     @Before
-    public void setup() {
-        worker = PowerMock.createMock(Worker.class);
-        String[] methodNames = new String[]{"connectorType", 
"buildRestartPlan", "recordRestarting"};
-        herder = PowerMock.createPartialMock(StandaloneHerder.class, 
methodNames,
-                worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new 
MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, 
new MockTime());
+    public void setup() throws ExecutionException, InterruptedException {
+        worker = mock(Worker.class);
+        herder = mock(StandaloneHerder.class, withSettings()
+            .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
+            .defaultAnswer(CALLS_REAL_METHODS));
         createCallback = new FutureCallback<>();
-        plugins = PowerMock.createMock(Plugins.class);
-        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
-        loaderSwap = PowerMock.createMock(LoaderSwap.class);
-        PowerMock.mockStatic(WorkerConnector.class);
-        Capture<Map<String, String>> configCapture = Capture.newInstance();
-        EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), 
EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
+        plugins = mock(Plugins.class);
+        pluginLoader = mock(PluginClassLoader.class);
+        loaderSwap = mock(LoaderSwap.class);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        when(transformer.transform(eq(CONNECTOR_NAME), 
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
+    }
+
+    @After
+    public void tearDown() {
+        verifyNoMoreInteractions(worker, statusBackingStore);
     }
 
     @Test
     public void testCreateSourceConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        PowerMock.replayAll();
-
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testCreateConnectorFailedValidation() {
         // Basic validation should be performed and return an error, but 
should still evaluate the connector's config
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
 
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-        
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+        Connector connectorMock = mock(SourceConnector.class);
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
 
-        EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+        when(connectorMock.config()).thenReturn(new ConfigDef());
 
         ConfigValue validatedValue = new ConfigValue("foo.bar");
-        EasyMock.expect(connectorMock.validate(config)).andReturn(new 
Config(singletonList(validatedValue)));
-        loaderSwap.close();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
+        when(connectorMock.validate(config)).thenReturn(new 
Config(singletonList(validatedValue)));
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
 
@@ -191,82 +196,66 @@ public class StandaloneHerderTest {
         if (BadRequestException.class != exception.getCause().getClass()) {
             throw new AssertionError(exception.getCause());
         }
-        PowerMock.verifyAll();
+        verify(loaderSwap).close();
     }
 
     @Test
     public void testCreateConnectorAlreadyExists() throws Throwable {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         // First addition should succeed
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config, config);
 
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
-        // No new connector is created
-        loaderSwap.close();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         // Second should fail
         FutureCallback<Herder.Created<ConnectorInfo>> failedCreateCallback = 
new FutureCallback<>();
+        // No new connector is created
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
failedCreateCallback);
         ExecutionException exception = assertThrows(ExecutionException.class, 
() -> failedCreateCallback.get(1000L, TimeUnit.SECONDS));
         if (AlreadyExistsException.class != exception.getCause().getClass()) {
             throw new AssertionError(exception.getCause());
         }
-        PowerMock.verifyAll();
+        verify(loaderSwap, times(2)).close();
     }
 
     @Test
     public void testCreateSinkConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSinkConnector.class);
+        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> config = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        Connector connectorMock = mock(SinkConnector.class);
         expectConfigValidation(connectorMock, true, config);
-        PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testCreateConnectorWithStoppedInitialState() throws Exception {
-        connector = PowerMock.createMock(BogusSinkConnector.class);
+        connector = mock(BogusSinkConnector.class);
         Map<String, String> config = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        Connector connectorMock = mock(SinkConnector.class);
         expectConfigValidation(connectorMock, false, config);
-        
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        when(plugins.newConnector(anyString())).thenReturn(connectorMock);
 
         // Only the connector should be created; we expect no tasks to be 
spawned for a connector created with a paused or stopped initial state
-        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
-        worker.startConnector(eq(CONNECTOR_NAME), eq(config), 
EasyMock.anyObject(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STOPPED), EasyMock.capture(onStart));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             onStart.getValue().onCompletion(null, TargetState.STOPPED);
             return true;
-        });
-        
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true).anyTimes();
-        
EasyMock.expect(herder.connectorType(anyObject())).andReturn(ConnectorType.SINK);
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), 
any(HerderConnectorContext.class),
+            eq(herder), eq(TargetState.STOPPED), onStart.capture());
 
-        PowerMock.replayAll();
+        when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true);
+        when(herder.connectorType(any())).thenReturn(ConnectorType.SINK);
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, TargetState.STOPPED, 
false, createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
@@ -274,46 +263,41 @@ public class StandaloneHerderTest {
             new ConnectorInfo(CONNECTOR_NAME, 
connectorConfig(SourceSink.SINK), Collections.emptyList(), ConnectorType.SINK),
             connectorInfo.result()
         );
-
-        PowerMock.verifyAll();
+        verify(loaderSwap).close();
     }
 
     @Test
     public void testDestroyConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        
EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.emptyList());
-        statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, 
AbstractStatus.State.DESTROYED, WORKER_ID, 0));
-        statusBackingStore.put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0));
-
-        expectDestroy();
-
-        PowerMock.replayAll();
+        
when(statusBackingStore.getAll(CONNECTOR_NAME)).thenReturn(Collections.emptyList());
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         FutureCallback<Herder.Created<ConnectorInfo>> deleteCallback = new 
FutureCallback<>();
+        expectDestroy();
         herder.deleteConnectorConfig(CONNECTOR_NAME, deleteCallback);
+        verify(herder).onDeletion(CONNECTOR_NAME);
+        verify(statusBackingStore).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0));
+        verify(statusBackingStore).put(new ConnectorStatus(CONNECTOR_NAME, 
ConnectorStatus.State.DESTROYED, WORKER_ID, 0));
         deleteCallback.get(1000L, TimeUnit.MILLISECONDS);
 
         // Second deletion should fail since the connector is gone
         FutureCallback<Herder.Created<ConnectorInfo>> failedDeleteCallback = 
new FutureCallback<>();
         herder.deleteConnectorConfig(CONNECTOR_NAME, failedDeleteCallback);
-        try {
-            failedDeleteCallback.get(1000L, TimeUnit.MILLISECONDS);
-            fail("Should have thrown NotFoundException");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof NotFoundException);
-        }
-
-        PowerMock.verifyAll();
+        ExecutionException e = assertThrows(
+                "Should have thrown NotFoundException",
+                ExecutionException.class,
+                () -> failedDeleteCallback.get(1000L, TimeUnit.MILLISECONDS)
+        );
+        assertTrue(e.getCause() instanceof NotFoundException);
     }
 
     @Test
@@ -321,26 +305,21 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        worker.stopAndAwaitConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
-
-        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
-        worker.startConnector(eq(CONNECTOR_NAME), eq(config), 
EasyMock.anyObject(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             onStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
-        });
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), 
any(HerderConnectorContext.class),
+            eq(herder), eq(TargetState.STARTED), onStart.capture());
 
-        
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME));
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+        when(worker.getPlugins()).thenReturn(plugins);
         // same task configs as earlier, so don't expect a new set of tasks to 
be brought up
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, 
true))).andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
-
-        PowerMock.replayAll();
+        when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, true)))
+            
.thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
@@ -349,8 +328,7 @@ public class StandaloneHerderTest {
         FutureCallback<Void> restartCallback = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, restartCallback);
         restartCallback.get(1000L, TimeUnit.MILLISECONDS);
-
-        PowerMock.verifyAll();
+        verify(worker).stopAndAwaitConnector(eq(CONNECTOR_NAME));
     }
 
     @Test
@@ -359,45 +337,35 @@ public class StandaloneHerderTest {
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        worker.stopAndAwaitConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
-        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
-        worker.startConnector(eq(CONNECTOR_NAME), eq(config), 
EasyMock.anyObject(HerderConnectorContext.class),
-                eq(herder), eq(TargetState.STARTED), 
EasyMock.capture(onStart));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             onStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
-        });
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), 
any(HerderConnectorContext.class),
+                eq(herder), eq(TargetState.STARTED), onStart.capture());
 
-        
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME));
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+        when(worker.getPlugins()).thenReturn(plugins);
         // changed task configs, expect a new set of tasks to be brought up 
(and the old ones to be stopped)
         Map<String, String> taskConfigs = taskConfig(SourceSink.SOURCE);
         taskConfigs.put("k", "v");
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, 
true))).andReturn(Collections.singletonList(taskConfigs));
+        when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, true)))
+                .thenReturn(Collections.singletonList(taskConfigs));
 
-        worker.stopAndAwaitTasks(Collections.singletonList(taskId));
-        EasyMock.expectLastCall();
-        statusBackingStore.put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
-        EasyMock.expectLastCall();
-        worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), 
EasyMock.anyObject(), eq(connectorConfig(SourceSink.SOURCE)), eq(taskConfigs), 
eq(herder), eq(TargetState.STARTED));
-        EasyMock.expectLastCall().andReturn(true);
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
-        assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+        when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 
0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(taskConfigs), 
eq(herder), eq(TargetState.STARTED))).thenReturn(true);
 
         FutureCallback<Void> restartCallback = new FutureCallback<>();
+        expectStop();
         herder.restartConnector(CONNECTOR_NAME, restartCallback);
+        verify(statusBackingStore).put(new TaskStatus(taskId, 
TaskStatus.State.DESTROYED, WORKER_ID, 0));
         restartCallback.get(1000L, TimeUnit.MILLISECONDS);
-
-        PowerMock.verifyAll();
     }
 
     @Test
@@ -405,28 +373,25 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        worker.stopAndAwaitConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
+        doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
 
-        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
-        worker.startConnector(eq(CONNECTOR_NAME), eq(config), 
EasyMock.anyObject(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
-        Exception exception = new ConnectException("Failed to start 
connector");
-        EasyMock.expectLastCall().andAnswer(() -> {
-            onStart.getValue().onCompletion(exception, null);
-            return true;
-        });
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
 
-        PowerMock.replayAll();
+        Exception exception = new ConnectException("Failed to start 
connector");
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         FutureCallback<Void> restartCallback = new FutureCallback<>();
+        doAnswer(invocation -> {
+            onStart.getValue().onCompletion(exception, null);
+            return true;
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), 
any(HerderConnectorContext.class),
+            eq(herder), eq(TargetState.STARTED), onStart.capture());
         herder.restartConnector(CONNECTOR_NAME, restartCallback);
         try {
             restartCallback.get(1000L, TimeUnit.MILLISECONDS);
@@ -434,8 +399,6 @@ public class StandaloneHerderTest {
         } catch (ExecutionException e) {
             assertEquals(exception, e.getCause());
         }
-
-        PowerMock.verifyAll();
     }
 
     @Test
@@ -444,11 +407,10 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, connectorConfig);
 
-        worker.stopAndAwaitTask(taskId);
-        EasyMock.expectLastCall();
+        doNothing().when(worker).stopAndAwaitTask(taskId);
 
         ClusterConfigState configState = new ClusterConfigState(
                 -1,
@@ -462,10 +424,8 @@ public class StandaloneHerderTest {
                 new HashSet<>(),
                 new HashSet<>(),
                 transformer);
-        worker.startSourceTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
-        EasyMock.expectLastCall().andReturn(true);
-
-        PowerMock.replayAll();
+        when(worker.startSourceTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED))
+            .thenReturn(true);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
@@ -475,8 +435,6 @@ public class StandaloneHerderTest {
         FutureCallback<Void> restartTaskCallback = new FutureCallback<>();
         herder.restartTask(taskId, restartTaskCallback);
         restartTaskCallback.get(1000L, TimeUnit.MILLISECONDS);
-
-        PowerMock.verifyAll();
     }
 
     @Test
@@ -485,12 +443,9 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, connectorConfig);
 
-        worker.stopAndAwaitTask(taskId);
-        EasyMock.expectLastCall();
-
         ClusterConfigState configState = new ClusterConfigState(
                 -1,
                 null,
@@ -503,10 +458,8 @@ public class StandaloneHerderTest {
                 new HashSet<>(),
                 new HashSet<>(),
                 transformer);
-        worker.startSourceTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
-        EasyMock.expectLastCall().andReturn(false);
-
-        PowerMock.replayAll();
+        when(worker.startSourceTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED))
+            .thenReturn(false);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
@@ -514,41 +467,35 @@ public class StandaloneHerderTest {
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartTask(taskId, cb);
+        verify(worker).stopAndAwaitTask(taskId);
         try {
             cb.get(1000L, TimeUnit.MILLISECONDS);
             fail("Expected restart callback to raise an exception");
         } catch (ExecutionException exception) {
             assertEquals(ConnectException.class, 
exception.getCause().getClass());
         }
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testRestartConnectorAndTasksUnknownConnector() {
-        PowerMock.replayAll();
-
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         RestartRequest restartRequest = new RestartRequest("UnknownConnector", 
false, true);
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
         ExecutionException ee = assertThrows(ExecutionException.class, () -> 
restartCallback.get(1000L, TimeUnit.MILLISECONDS));
         assertTrue(ee.getCause() instanceof NotFoundException);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testRestartConnectorAndTasksNoStatus() throws Exception {
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-        
EasyMock.expect(herder.buildRestartPlan(restartRequest)).andReturn(Optional.empty()).anyTimes();
+        
doReturn(Optional.empty()).when(herder).buildRestartPlan(restartRequest);
 
-        connector = PowerMock.createMock(BogusSinkConnector.class);
+        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        Connector connectorMock = mock(SinkConnector.class);
         expectConfigValidation(connectorMock, true, connectorConfig);
-        PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
@@ -559,29 +506,25 @@ public class StandaloneHerderTest {
         ExecutionException ee = assertThrows(ExecutionException.class, () -> 
restartCallback.get(1000L, TimeUnit.MILLISECONDS));
         assertTrue(ee.getCause() instanceof NotFoundException);
         assertTrue(ee.getMessage().contains("Status for connector"));
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testRestartConnectorAndTasksNoRestarts() throws Exception {
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-        RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-        ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-        
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes();
-        
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes();
-        
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-        EasyMock.expect(herder.buildRestartPlan(restartRequest))
-                .andReturn(Optional.of(restartPlan)).anyTimes();
-
-        connector = PowerMock.createMock(BogusSinkConnector.class);
+        RestartPlan restartPlan = mock(RestartPlan.class);
+        ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+        when(restartPlan.shouldRestartConnector()).thenReturn(false);
+        when(restartPlan.shouldRestartTasks()).thenReturn(false);
+        
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+        
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
+
+        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        Connector connectorMock = mock(SinkConnector.class);
         expectConfigValidation(connectorMock, true, connectorConfig);
 
-        PowerMock.replayAll();
-
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
@@ -589,42 +532,33 @@ public class StandaloneHerderTest {
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
         assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testRestartConnectorAndTasksOnlyConnector() throws Exception {
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-        RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-        ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-        
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes();
-        
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes();
-        
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-        EasyMock.expect(herder.buildRestartPlan(restartRequest))
-                .andReturn(Optional.of(restartPlan)).anyTimes();
-
-        herder.onRestart(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
-
-        connector = PowerMock.createMock(BogusSinkConnector.class);
+        RestartPlan restartPlan = mock(RestartPlan.class);
+        ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+        when(restartPlan.shouldRestartConnector()).thenReturn(true);
+        when(restartPlan.shouldRestartTasks()).thenReturn(false);
+        
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+        
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
+
+        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        Connector connectorMock = mock(SinkConnector.class);
         expectConfigValidation(connectorMock, true, connectorConfig);
 
-        worker.stopAndAwaitConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
+        doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
 
-        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
-        worker.startConnector(eq(CONNECTOR_NAME), eq(connectorConfig), 
EasyMock.anyObject(HerderConnectorContext.class),
-                eq(herder), eq(TargetState.STARTED), 
EasyMock.capture(onStart));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             onStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
-        });
-
-        PowerMock.replayAll();
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), 
eq(connectorConfig), any(HerderConnectorContext.class),
+            eq(herder), eq(TargetState.STARTED), onStart.capture());
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
@@ -633,36 +567,32 @@ public class StandaloneHerderTest {
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
         assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-        PowerMock.verifyAll();
+
+        verifyConnectorStatusRestart();
     }
 
     @Test
     public void testRestartConnectorAndTasksOnlyTasks() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-        RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-        ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-        
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes();
-        
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes();
-        
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes();
-        EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes();
-        
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes();
-        
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-        EasyMock.expect(herder.buildRestartPlan(restartRequest))
-                .andReturn(Optional.of(restartPlan)).anyTimes();
-
-        herder.onRestart(taskId);
-        EasyMock.expectLastCall();
-
-        connector = PowerMock.createMock(BogusSinkConnector.class);
+        RestartPlan restartPlan = mock(RestartPlan.class);
+        ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+        when(restartPlan.shouldRestartConnector()).thenReturn(false);
+        when(restartPlan.shouldRestartTasks()).thenReturn(true);
+        when(restartPlan.restartTaskCount()).thenReturn(1);
+        when(restartPlan.totalTaskCount()).thenReturn(1);
+        
when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(taskId));
+        
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+        
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
+
+        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        Connector connectorMock = mock(SinkConnector.class);
         expectConfigValidation(connectorMock, true, connectorConfig);
 
-        worker.stopAndAwaitTasks(Collections.singletonList(taskId));
-        EasyMock.expectLastCall();
+        
doNothing().when(worker).stopAndAwaitTasks(Collections.singletonList(taskId));
 
         ClusterConfigState configState = new ClusterConfigState(
                 -1,
@@ -676,9 +606,8 @@ public class StandaloneHerderTest {
                 new HashSet<>(),
                 new HashSet<>(),
                 transformer);
-        worker.startSinkTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SINK), herder, TargetState.STARTED);
-        EasyMock.expectLastCall().andReturn(true);
-        PowerMock.replayAll();
+        when(worker.startSinkTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SINK), herder, TargetState.STARTED))
+            .thenReturn(true);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
@@ -687,48 +616,44 @@ public class StandaloneHerderTest {
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
         assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-        PowerMock.verifyAll();
+        ArgumentCaptor<TaskStatus> taskStatus = 
ArgumentCaptor.forClass(TaskStatus.class);
+        verify(statusBackingStore).put(taskStatus.capture());
+        assertEquals(AbstractStatus.State.RESTARTING, 
taskStatus.getValue().state());
+        assertEquals(taskId, taskStatus.getValue().id());
     }
 
     @Test
     public void testRestartConnectorAndTasksBoth() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-        RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-        ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-        
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes();
-        
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes();
-        
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes();
-        EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes();
-        
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes();
-        
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-        EasyMock.expect(herder.buildRestartPlan(restartRequest))
-                .andReturn(Optional.of(restartPlan)).anyTimes();
-
-        herder.onRestart(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
-        herder.onRestart(taskId);
-        EasyMock.expectLastCall();
-
-        connector = PowerMock.createMock(BogusSinkConnector.class);
+        RestartPlan restartPlan = mock(RestartPlan.class);
+        ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+        when(restartPlan.shouldRestartConnector()).thenReturn(true);
+        when(restartPlan.shouldRestartTasks()).thenReturn(true);
+        when(restartPlan.restartTaskCount()).thenReturn(1);
+        when(restartPlan.totalTaskCount()).thenReturn(1);
+        
when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(taskId));
+        
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+        
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
+
+        ArgumentCaptor<TaskStatus> taskStatus = 
ArgumentCaptor.forClass(TaskStatus.class);
+
+        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        Connector connectorMock = mock(SinkConnector.class);
         expectConfigValidation(connectorMock, true, connectorConfig);
 
-        worker.stopAndAwaitConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
-        worker.stopAndAwaitTasks(Collections.singletonList(taskId));
-        EasyMock.expectLastCall();
+        doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
+        
doNothing().when(worker).stopAndAwaitTasks(Collections.singletonList(taskId));
 
-        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
-        worker.startConnector(eq(CONNECTOR_NAME), eq(connectorConfig), 
EasyMock.anyObject(HerderConnectorContext.class),
-                eq(herder), eq(TargetState.STARTED), 
EasyMock.capture(onStart));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             onStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
-        });
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), 
eq(connectorConfig), any(HerderConnectorContext.class),
+            eq(herder), eq(TargetState.STARTED), onStart.capture());
 
         ClusterConfigState configState = new ClusterConfigState(
                 -1,
@@ -742,9 +667,8 @@ public class StandaloneHerderTest {
                 new HashSet<>(),
                 new HashSet<>(),
                 transformer);
-        worker.startSinkTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SINK), herder, TargetState.STARTED);
-        EasyMock.expectLastCall().andReturn(true);
-        PowerMock.replayAll();
+        when(worker.startSinkTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SINK), herder, TargetState.STARTED))
+            .thenReturn(true);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
@@ -753,39 +677,34 @@ public class StandaloneHerderTest {
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
         assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-        PowerMock.verifyAll();
+
+        verifyConnectorStatusRestart();
+        verify(statusBackingStore).put(taskStatus.capture());
+        assertEquals(AbstractStatus.State.RESTARTING, 
taskStatus.getValue().state());
+        assertEquals(taskId, taskStatus.getValue().id());
     }
 
     @Test
     public void testCreateAndStop() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, connectorConfig);
 
-        // herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
         expectStop();
 
-        statusBackingStore.put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
-        EasyMock.expectLastCall();
-
-        statusBackingStore.stop();
-        EasyMock.expectLastCall();
-        worker.stop();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
+        // herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
         herder.stop();
         assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
-
-        PowerMock.verifyAll();
+        verify(worker).stop();
+        verify(statusBackingStore).stop();
+        verify(statusBackingStore).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
     }
 
     @Test
@@ -793,49 +712,36 @@ public class StandaloneHerderTest {
         Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE);
         System.out.println(connConfig);
 
-        Callback<Collection<String>> listConnectorsCb = 
PowerMock.createMock(Callback.class);
-        Callback<ConnectorInfo> connectorInfoCb = 
PowerMock.createMock(Callback.class);
-        Callback<Map<String, String>> connectorConfigCb = 
PowerMock.createMock(Callback.class);
-        Callback<List<TaskInfo>> taskConfigsCb = 
PowerMock.createMock(Callback.class);
-        Callback<Map<ConnectorTaskId, Map<String, String>>> tasksConfigCb = 
PowerMock.createMock(Callback.class);
+        Callback<Collection<String>> listConnectorsCb = mock(Callback.class);
+        Callback<ConnectorInfo> connectorInfoCb = mock(Callback.class);
+        Callback<Map<String, String>> connectorConfigCb = mock(Callback.class);
+        Callback<List<TaskInfo>> taskConfigsCb = mock(Callback.class);
+        Callback<Map<ConnectorTaskId, Map<String, String>>> tasksConfigCb = 
mock(Callback.class);
 
         // Check accessors with empty worker
-        listConnectorsCb.onCompletion(null, Collections.EMPTY_SET);
-        EasyMock.expectLastCall();
-        connectorInfoCb.onCompletion(EasyMock.<NotFoundException>anyObject(), 
EasyMock.isNull());
-        EasyMock.expectLastCall();
-        
connectorConfigCb.onCompletion(EasyMock.<NotFoundException>anyObject(), 
EasyMock.isNull());
-        EasyMock.expectLastCall();
-        taskConfigsCb.onCompletion(EasyMock.<NotFoundException>anyObject(), 
EasyMock.isNull());
-        EasyMock.expectLastCall();
-        tasksConfigCb.onCompletion(EasyMock.<NotFoundException>anyObject(), 
EasyMock.isNull());
-        EasyMock.expectLastCall();
+        doNothing().when(listConnectorsCb).onCompletion(null, 
Collections.EMPTY_SET);
+        
doNothing().when(connectorInfoCb).onCompletion(any(NotFoundException.class), 
isNull());
+        
doNothing().when(taskConfigsCb).onCompletion(any(NotFoundException.class), 
isNull());
+        
doNothing().when(tasksConfigCb).onCompletion(any(NotFoundException.class), 
isNull());
+        
doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), 
isNull());
 
         // Create connector
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
         expectConfigValidation(connector, true, connConfig);
 
         // Validate accessors with 1 connector
-        listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME));
-        EasyMock.expectLastCall();
+        doNothing().when(listConnectorsCb).onCompletion(null, 
singleton(CONNECTOR_NAME));
         ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, 
Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
             ConnectorType.SOURCE);
-        connectorInfoCb.onCompletion(null, connInfo);
-        EasyMock.expectLastCall();
-        connectorConfigCb.onCompletion(null, connConfig);
-        EasyMock.expectLastCall();
+        doNothing().when(connectorInfoCb).onCompletion(null, connInfo);
 
         TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 
0), taskConfig(SourceSink.SOURCE));
-        taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo));
-        EasyMock.expectLastCall();
+        doNothing().when(taskConfigsCb).onCompletion(null, 
Arrays.asList(taskInfo));
 
         Map<ConnectorTaskId, Map<String, String>> tasksConfig = 
Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0),
             taskConfig(SourceSink.SOURCE));
-        tasksConfigCb.onCompletion(null, tasksConfig);
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
+        doNothing().when(tasksConfigCb).onCompletion(null, tasksConfig);
 
         // All operations are synchronous for StandaloneHerder, so we don't 
need to actually wait after making each call
         herder.connectors(listConnectorsCb);
@@ -848,19 +754,14 @@ public class StandaloneHerderTest {
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
-        EasyMock.reset(transformer);
-        EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), 
EasyMock.anyObject()))
-            .andThrow(new AssertionError("Config transformation should not 
occur when requesting connector or task info"))
-            .anyTimes();
-        EasyMock.replay(transformer);
-
+        reset(transformer);
         herder.connectors(listConnectorsCb);
         herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
         herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
         herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
         herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb);
-
-        PowerMock.verifyAll();
+        // Config transformation should not occur when requesting connector or 
task info
+        verify(transformer, never()).transform(eq(CONNECTOR_NAME), any());
     }
 
     @Test
@@ -869,39 +770,30 @@ public class StandaloneHerderTest {
         Map<String, String> newConnConfig = new HashMap<>(connConfig);
         newConnConfig.put("foo", "bar");
 
-        Callback<Map<String, String>> connectorConfigCb = 
PowerMock.createMock(Callback.class);
-        // Callback<Herder.Created<ConnectorInfo>> putConnectorConfigCb = 
PowerMock.createMock(Callback.class);
+        Callback<Map<String, String>> connectorConfigCb = mock(Callback.class);
 
         // Create
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, connConfig);
 
         // Should get first config
-        connectorConfigCb.onCompletion(null, connConfig);
-        EasyMock.expectLastCall();
+        doNothing().when(connectorConfigCb).onCompletion(null, connConfig);
         // Update config, which requires stopping and restarting
-        worker.stopAndAwaitConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
-        Capture<Map<String, String>> capturedConfig = EasyMock.newCapture();
-        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
-        worker.startConnector(eq(CONNECTOR_NAME), 
EasyMock.capture(capturedConfig), EasyMock.anyObject(),
-                eq(herder), eq(TargetState.STARTED), 
EasyMock.capture(onStart));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
+        final ArgumentCaptor<Map<String, String>> capturedConfig = 
ArgumentCaptor.forClass(Map.class);
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             onStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
-        });
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), 
capturedConfig.capture(), any(),
+            eq(herder), eq(TargetState.STARTED), onStart.capture());
         // Generate same task config, which should result in no additional 
action to restart tasks
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, newConnConfig, true)))
-                .andReturn(singletonList(taskConfig(SourceSink.SOURCE)));
+        when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, newConnConfig, true)))
+            .thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
 
         expectConfigValidation(connectorMock, false, newConnConfig);
-        connectorConfigCb.onCompletion(null, newConnConfig);
-        EasyMock.expectLastCall();
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
-
-        PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
@@ -910,6 +802,7 @@ public class StandaloneHerderTest {
         herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
 
         FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = 
new FutureCallback<>();
+        doNothing().when(connectorConfigCb).onCompletion(null, newConnConfig);
         herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, 
reconfigureCallback);
         Herder.Created<ConnectorInfo> newConnectorInfo = 
reconfigureCallback.get(1000L, TimeUnit.SECONDS);
         ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, 
newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
@@ -918,96 +811,76 @@ public class StandaloneHerderTest {
 
         assertEquals("bar", capturedConfig.getValue().get("foo"));
         herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
-
-        PowerMock.verifyAll();
+        verifyNoMoreInteractions(connectorConfigCb);
     }
 
     @Test
     public void testPutTaskConfigs() {
-        Callback<Void> cb = PowerMock.createMock(Callback.class);
-
-        PowerMock.replayAll();
+        Callback<Void> cb = mock(Callback.class);
 
         assertThrows(UnsupportedOperationException.class, () -> 
herder.putTaskConfigs(CONNECTOR_NAME,
                 singletonList(singletonMap("config", "value")), cb, null));
-        PowerMock.verifyAll();
     }
 
     @Test
-    public void testCorruptConfig() throws Throwable {
+    public void testCorruptConfig() {
         Map<String, String> config = new HashMap<>();
         config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
         config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSinkConnector.class.getName());
         config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR);
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        Connector connectorMock = mock(SinkConnector.class);
         String error = "This is an error in your config!";
         List<String> errors = new ArrayList<>(singletonList(error));
         String key = "foo.invalid.key";
-        EasyMock.expect(connectorMock.validate(config)).andReturn(
+        when(connectorMock.validate(config)).thenReturn(
             new Config(
                 Arrays.asList(new ConfigValue(key, null, 
Collections.emptyList(), errors))
             )
         );
         ConfigDef configDef = new ConfigDef();
         configDef.define(key, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "");
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
-        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-        
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-        EasyMock.expect(connectorMock.config()).andStubReturn(configDef);
-        loaderSwap.close();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
+        when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+        when(connectorMock.config()).thenReturn(configDef);
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, true, 
createCallback);
-        try {
-            createCallback.get(1000L, TimeUnit.SECONDS);
-            fail("Should have failed to configure connector");
-        } catch (ExecutionException e) {
-            assertNotNull(e.getCause());
-            Throwable cause = e.getCause();
-            assertTrue(cause instanceof BadRequestException);
-            assertEquals(
+        ExecutionException e = assertThrows(
+                "Should have failed to configure connector",
+                ExecutionException.class,
+                () -> createCallback.get(1000L, TimeUnit.SECONDS)
+        );
+        assertNotNull(e.getCause());
+        Throwable cause = e.getCause();
+        assertTrue(cause instanceof BadRequestException);
+        assertEquals(
                 cause.getMessage(),
                 "Connector configuration is invalid and contains the following 
1 error(s):\n" +
-                    error + "\n" +
-                    "You can also find the above list of errors at the 
endpoint `/connector-plugins/{connectorType}/config/validate`"
-            );
-        }
-
-        PowerMock.verifyAll();
+                        error + "\n" +
+                        "You can also find the above list of errors at the 
endpoint `/connector-plugins/{connectorType}/config/validate`"
+        );
+        verify(loaderSwap).close();
     }
 
     @Test
     public void testTargetStates() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, connectorConfig);
 
         // We pause, then stop, the connector
         expectTargetState(CONNECTOR_NAME, TargetState.PAUSED);
         expectTargetState(CONNECTOR_NAME, TargetState.STOPPED);
 
-        // herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
         expectStop();
 
-        statusBackingStore.put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
-        EasyMock.expectLastCall();
-
-        statusBackingStore.stop();
-        EasyMock.expectLastCall();
-        worker.stop();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
 
         FutureCallback<Void> stopCallback = new FutureCallback<>();
         FutureCallback<List<TaskInfo>> taskConfigsCallback = new 
FutureCallback<>();
@@ -1017,20 +890,21 @@ public class StandaloneHerderTest {
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
         herder.pauseConnector(CONNECTOR_NAME);
         herder.stopConnector(CONNECTOR_NAME, stopCallback);
+        verify(statusBackingStore).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
         stopCallback.get(10L, TimeUnit.SECONDS);
         herder.taskConfigs(CONNECTOR_NAME, taskConfigsCallback);
         assertEquals(Collections.emptyList(), taskConfigsCallback.get(1, 
TimeUnit.SECONDS));
 
+        // herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
         herder.stop();
         assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
-
-        PowerMock.verifyAll();
+        verify(worker).stop();
+        verify(statusBackingStore).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
+        verify(statusBackingStore).stop();
     }
 
     @Test
     public void testModifyConnectorOffsetsUnknownConnector() {
-        PowerMock.replayAll();
-
         FutureCallback<Message> alterOffsetsCallback = new FutureCallback<>();
         herder.alterConnectorOffsets("unknown-connector",
                 
Collections.singletonMap(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
@@ -1042,14 +916,10 @@ public class StandaloneHerderTest {
         herder.resetConnectorOffsets("unknown-connector", 
resetOffsetsCallback);
         e = assertThrows(ExecutionException.class, () -> 
resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS));
         assertTrue(e.getCause() instanceof NotFoundException);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testModifyConnectorOffsetsConnectorNotInStoppedState() {
-        PowerMock.replayAll();
-
         herder.configState = new ClusterConfigState(
                 10,
                 null,
@@ -1074,20 +944,16 @@ public class StandaloneHerderTest {
         herder.resetConnectorOffsets(CONNECTOR_NAME, resetOffsetsCallback);
         e = assertThrows(ExecutionException.class, () -> 
resetOffsetsCallback.get(1000L, TimeUnit.MILLISECONDS));
         assertTrue(e.getCause() instanceof BadRequestException);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testAlterConnectorOffsets() throws Exception {
-        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
+        ArgumentCaptor<Callback<Message>> workerCallbackCapture = 
ArgumentCaptor.forClass(Callback.class);
         Message msg = new Message("The offsets for this connector have been 
altered successfully");
-        worker.modifyConnectorOffsets(eq(CONNECTOR_NAME), 
eq(connectorConfig(SourceSink.SOURCE)), anyObject(Map.class), 
capture(workerCallbackCapture));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        doAnswer(invocation -> {
             workerCallbackCapture.getValue().onCompletion(null, msg);
             return null;
-        });
-        PowerMock.replayAll();
+        }).when(worker).modifyConnectorOffsets(eq(CONNECTOR_NAME), 
eq(connectorConfig(SourceSink.SOURCE)), any(Map.class), 
workerCallbackCapture.capture());
 
         herder.configState = new ClusterConfigState(
                 10,
@@ -1106,19 +972,17 @@ public class StandaloneHerderTest {
                 
Collections.singletonMap(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
                 alterOffsetsCallback);
         assertEquals(msg, alterOffsetsCallback.get(1000, 
TimeUnit.MILLISECONDS));
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testResetConnectorOffsets() throws Exception {
-        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
+        ArgumentCaptor<Callback<Message>> workerCallbackCapture = 
ArgumentCaptor.forClass(Callback.class);
         Message msg = new Message("The offsets for this connector have been 
reset successfully");
-        worker.modifyConnectorOffsets(eq(CONNECTOR_NAME), 
eq(connectorConfig(SourceSink.SOURCE)), isNull(), 
capture(workerCallbackCapture));
-        EasyMock.expectLastCall().andAnswer(() -> {
+
+        doAnswer(invocation -> {
             workerCallbackCapture.getValue().onCompletion(null, msg);
             return null;
-        });
-        PowerMock.replayAll();
+        }).when(worker).modifyConnectorOffsets(eq(CONNECTOR_NAME), 
eq(connectorConfig(SourceSink.SOURCE)), isNull(), 
workerCallbackCapture.capture());
 
         herder.configState = new ClusterConfigState(
                 10,
@@ -1135,7 +999,6 @@ public class StandaloneHerderTest {
         FutureCallback<Message> resetOffsetsCallback = new FutureCallback<>();
         herder.resetConnectorOffsets(CONNECTOR_NAME, resetOffsetsCallback);
         assertEquals(msg, resetOffsetsCallback.get(1000, 
TimeUnit.MILLISECONDS));
-        PowerMock.verifyAll();
     }
 
     private void expectAdd(SourceSink sourceSink) {
@@ -1144,24 +1007,23 @@ public class StandaloneHerderTest {
             new SourceConnectorConfig(plugins, connectorProps, true) :
             new SinkConnectorConfig(plugins, connectorProps);
 
-        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
-        worker.startConnector(eq(CONNECTOR_NAME), eq(connectorProps), 
EasyMock.anyObject(HerderConnectorContext.class),
-                eq(herder), eq(TargetState.STARTED), 
EasyMock.capture(onStart));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             onStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
-        });
-        
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true).anyTimes();
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(connectorProps), 
any(HerderConnectorContext.class),
+            eq(herder), eq(TargetState.STARTED), onStart.capture());
+        when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true);
         if (sourceSink == SourceSink.SOURCE) {
-            
EasyMock.expect(worker.isTopicCreationEnabled()).andReturn(true).anyTimes();
+            when(worker.isTopicCreationEnabled()).thenReturn(true);
         }
 
         // And we should instantiate the tasks. For a sink task, we should see 
added properties for the input topic partitions
 
         Map<String, String> generatedTaskProps = taskConfig(sourceSink);
 
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, 
connConfig))
-            .andReturn(singletonList(generatedTaskProps));
+        when(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
+            .thenReturn(singletonList(generatedTaskProps));
 
         ClusterConfigState configState = new ClusterConfigState(
                 -1,
@@ -1176,34 +1038,33 @@ public class StandaloneHerderTest {
                 new HashSet<>(),
                 transformer);
         if (sourceSink.equals(SourceSink.SOURCE)) {
-            worker.startSourceTask(new ConnectorTaskId(CONNECTOR_NAME, 0), 
configState, connectorConfig(sourceSink), generatedTaskProps, herder, 
TargetState.STARTED);
+            when(worker.startSourceTask(new ConnectorTaskId(CONNECTOR_NAME, 
0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, 
TargetState.STARTED)).thenReturn(true);
         } else {
-            worker.startSinkTask(new ConnectorTaskId(CONNECTOR_NAME, 0), 
configState, connectorConfig(sourceSink), generatedTaskProps, herder, 
TargetState.STARTED);
+            when(worker.startSinkTask(new ConnectorTaskId(CONNECTOR_NAME, 0), 
configState, connectorConfig(sourceSink), generatedTaskProps, herder, 
TargetState.STARTED)).thenReturn(true);
         }
-        EasyMock.expectLastCall().andReturn(true);
-
-        Capture<Map<String, String>> configCapture = EasyMock.newCapture();
-        EasyMock.expect(herder.connectorType(EasyMock.capture(configCapture)))
-            .andStubAnswer(() -> {
-                String connectorClass = 
configCapture.getValue().get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-                if 
(BogusSourceConnector.class.getName().equals(connectorClass)) {
-                    return ConnectorType.SOURCE;
-                } else if 
(BogusSinkConnector.class.getName().equals(connectorClass)) {
-                    return ConnectorType.SINK;
-                }
-                return ConnectorType.UNKNOWN;
-            });
-        worker.isSinkConnector(CONNECTOR_NAME);
-        PowerMock.expectLastCall().andReturn(sourceSink == 
SourceSink.SINK).anyTimes();
+
+        ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(herder.connectorType(configCapture.capture())).thenAnswer(invocation -> {
+            String connectorClass = 
configCapture.getValue().get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            if (BogusSourceConnector.class.getName().equals(connectorClass)) {
+                return ConnectorType.SOURCE;
+            } else if 
(BogusSinkConnector.class.getName().equals(connectorClass)) {
+                return ConnectorType.SINK;
+            }
+            return ConnectorType.UNKNOWN;
+        });
+
+        when(worker.isSinkConnector(CONNECTOR_NAME))
+                .thenReturn(sourceSink == SourceSink.SINK);
     }
 
     private void expectTargetState(String connector, TargetState state) {
-        Capture<Callback<TargetState>> stateChangeCallback = 
Capture.newInstance();
-        worker.setTargetState(eq(connector), eq(state), 
capture(stateChangeCallback));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        ArgumentCaptor<Callback<TargetState>> stateChangeCallback = 
ArgumentCaptor.forClass(Callback.class);
+
+        doAnswer(invocation -> {
             stateChangeCallback.getValue().onCompletion(null, state);
             return null;
-        });
+        }).when(worker).setTargetState(eq(connector), eq(state), 
stateChangeCallback.capture());
     }
 
     private ConnectorInfo createdInfo(SourceSink sourceSink) {
@@ -1214,10 +1075,8 @@ public class StandaloneHerderTest {
 
     private void expectStop() {
         ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
-        worker.stopAndAwaitTasks(singletonList(task));
-        EasyMock.expectLastCall();
-        worker.stopAndAwaitConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
+        doNothing().when(worker).stopAndAwaitTasks(singletonList(task));
+        doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
     }
 
     private void expectDestroy() {
@@ -1256,22 +1115,20 @@ public class StandaloneHerderTest {
             Map<String, String>... configs
     ) {
         // config validation
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
         if (shouldCreateConnector) {
-            EasyMock.expect(worker.getPlugins()).andReturn(plugins);
-            
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+            when(worker.getPlugins()).thenReturn(plugins);
+            when(plugins.newConnector(anyString())).thenReturn(connectorMock);
         }
-        EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+        when(connectorMock.config()).thenReturn(new ConfigDef());
 
         for (Map<String, String> config : configs)
-            EasyMock.expect(connectorMock.validate(config)).andReturn(new 
Config(Collections.emptyList()));
-        loaderSwap.close();
-        EasyMock.expectLastCall();
+            when(connectorMock.validate(config)).thenReturn(new 
Config(Collections.emptyList()));
     }
 
     // We need to use a real class here due to some issue with mocking 
java.lang.Class
@@ -1287,4 +1144,10 @@ public class StandaloneHerderTest {
     private abstract class BogusSinkTask extends SourceTask {
     }
 
+    private void verifyConnectorStatusRestart() {
+        ArgumentCaptor<ConnectorStatus> connectorStatus = 
ArgumentCaptor.forClass(ConnectorStatus.class);
+        verify(statusBackingStore).put(connectorStatus.capture());
+        assertEquals(CONNECTOR_NAME, connectorStatus.getValue().id());
+        assertEquals(AbstractStatus.State.RESTARTING, 
connectorStatus.getValue().state());
+    }
 }

Reply via email to