mimaison commented on a change in pull request #11817:
URL: https://github.com/apache/kafka/pull/11817#discussion_r819461371



##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -289,54 +314,60 @@ public void testStartAndStopConnector() throws Throwable {
         assertStatistics(worker, 1, 0);
         assertStartupStatistics(worker, 1, 0, 0, 0);
         worker.stopAndAwaitConnector(CONNECTOR_ID);
+
         assertStatistics(worker, 0, 0);
         assertStartupStatistics(worker, 1, 0, 0, 0);
         assertEquals(Collections.emptySet(), worker.connectorNames());
+
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
         assertStatistics(worker, 0, 0);
 
-        PowerMock.verifyAll();
+
+        verify(plugins, times(2)).currentThreadLoader();
+        verify(plugins).delegatingLoader();
+        verify(delegatingLoader).connectorLoader(connectorClass);
+        verify(plugins).newConnector(connectorClass);
+        verify(sourceConnector, times(2)).version();
+        verify(sourceConnector).initialize(any(ConnectorContext.class));
+        verify(sourceConnector).start(connectorProps);
+        verify(connectorStatusListener).onStartup(CONNECTOR_ID);
+
+        pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
+        pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
+        connectUtilsMockedStatic.verify(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+
+        verify(sourceConnector).stop();
+        verify(connectorStatusListener).onShutdown(CONNECTOR_ID);
+        verify(ctx).close();
         MockFileConfigProvider.assertClosed(mockFileProviderTestId);
     }
 
-    private void expectFileConfigProvider() {
-        EasyMock.expect(plugins.newConfigProvider(EasyMock.anyObject(),
-                    EasyMock.eq("config.providers.file"), 
EasyMock.anyObject()))
-                .andAnswer(() -> {
-                    MockFileConfigProvider mockFileConfigProvider = new 
MockFileConfigProvider();
-                    
mockFileConfigProvider.configure(Collections.singletonMap("testId", 
mockFileProviderTestId));
-                    return mockFileConfigProvider;
-                });
+    private void mockFileConfigProvider() {
+        MockFileConfigProvider mockFileConfigProvider = new 
MockFileConfigProvider();
+        mockFileConfigProvider.configure(Collections.singletonMap("testId", 
mockFileProviderTestId));
+        when(plugins.newConfigProvider(any(AbstractConfig.class),
+                                               eq("config.providers.file"),
+                                               any(ClassLoaderUsage.class)))

Review comment:
       Can we align these 2 lines with `any` above?

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -213,69 +234,73 @@ public void setup() {
 
         // Some common defaults. They might change on individual tests
         connectorProps = anyConnectorConfigMap();
-        PowerMock.mockStatic(Plugins.class);
+
+        pluginsMockedStatic = mockStatic(Plugins.class);
+
+        // pass through things that aren't explicitly mocked out
+        connectUtilsMockedStatic = mockStatic(ConnectUtils.class, new 
CallsRealMethods());
+        connectUtilsMockedStatic.when(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
+
+        // Make calls to new WorkerSourceTask() return a mock to avoid the 
source task trying to connect to a broker.
+        sourceTaskMockedConstruction = 
mockConstructionWithAnswer(WorkerSourceTask.class, invocation -> {
+
+            // provide implementations of three methods used during testing
+            switch (invocation.getMethod().getName()) {
+                case "id":
+                    return TASK_ID;
+                case "loader":
+                    return pluginLoader;
+                case "awaitStop":
+                    return true;
+                default:
+                    return null;
+            }
+        });
+    }
+
+    @After
+    public void teardown() {
+        // Critical to always close MockedStatics
+        // Ideal would be to use try-with-resources in an individual test, but 
it introduced a rather large level of
+        // indentation of most test bodies, hence sticking with setup() / 
teardown()
+        pluginsMockedStatic.close();
+        connectUtilsMockedStatic.close();
+        sourceTaskMockedConstruction.close();
+
+        mockitoSession.finishMocking();
     }
 
     @Test
     public void testStartAndStopConnector() throws Throwable {
-        expectConverters();
-        expectStartStorage();
 
         final String connectorClass = WorkerTestConnector.class.getName();
-
-        // Create
-        
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
-        
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
-        
EasyMock.expect(delegatingLoader.connectorLoader(connectorClass)).andReturn(pluginLoader);
-        EasyMock.expect(plugins.newConnector(connectorClass))
-                .andReturn(sourceConnector);
-        EasyMock.expect(sourceConnector.version()).andReturn("1.0");
-
         connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
connectorClass);
 
-        EasyMock.expect(sourceConnector.version()).andReturn("1.0");
-
-        expectFileConfigProvider();
-        EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader))
-                .andReturn(delegatingLoader)
-                .times(3);
-        sourceConnector.initialize(anyObject(ConnectorContext.class));
-        EasyMock.expectLastCall();
-        sourceConnector.start(connectorProps);
-        EasyMock.expectLastCall();
-
-        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
-                .andReturn(pluginLoader).times(3);
-
-        connectorStatusListener.onStartup(CONNECTOR_ID);
-        EasyMock.expectLastCall();
-
-        // Remove
-        sourceConnector.stop();
-        EasyMock.expectLastCall();
-
-        connectorStatusListener.onShutdown(CONNECTOR_ID);
-        EasyMock.expectLastCall();
-
-        ctx.close();
-        expectLastCall();
-
-        expectStopStorage();
-        expectClusterId();
+        // Create
+        when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
+        when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
+        
when(delegatingLoader.connectorLoader(connectorClass)).thenReturn(pluginLoader);
+        when(plugins.newConnector(connectorClass)).thenReturn(sourceConnector);
+        when(sourceConnector.version()).thenReturn("1.0");
 
-        PowerMock.replayAll();
+        pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
+        pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
+        connectUtilsMockedStatic.when(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
+                                .thenReturn(CLUSTER_ID);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
-        worker.herder = herder;
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
 
         FutureCallback<TargetState> onFirstStart = new FutureCallback<>();
+
         worker.startConnector(CONNECTOR_ID, connectorProps, ctx, 
connectorStatusListener, TargetState.STARTED, onFirstStart);
+
         // Wait for the connector to actually start
         assertEquals(TargetState.STARTED, onFirstStart.get(1000, 
TimeUnit.MILLISECONDS));
-        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), 
worker.connectorNames());
+        assertEquals(new HashSet<>(Collections.singletonList(CONNECTOR_ID)), 
worker.connectorNames());

Review comment:
       We can use `Collections.singleton(CONNECTOR_ID)`. There are a few other 
places where we can use `Collections.singleton()` instead of `new HashSet()`.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -1468,36 +1287,7 @@ private void expectTaskHeaderConverter(ClassLoaderUsage 
classLoaderUsage, Header
         return props;
     }
 
-    private void expectClusterId() {
-        PowerMock.mockStaticPartial(ConnectUtils.class, 
"lookupKafkaClusterId");
-        
EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
-    }
 
-    private void expectNewWorkerTask() throws Exception {
-        PowerMock.expectNew(
-                WorkerSourceTask.class, EasyMock.eq(TASK_ID),
-                EasyMock.eq(task),
-                anyObject(TaskStatus.Listener.class),
-                EasyMock.eq(TargetState.STARTED),
-                anyObject(JsonConverter.class),
-                anyObject(JsonConverter.class),
-                anyObject(JsonConverter.class),
-                EasyMock.eq(new TransformationChain<>(Collections.emptyList(), 
NOOP_OPERATOR)),
-                anyObject(KafkaProducer.class),
-                anyObject(TopicAdmin.class),
-                EasyMock.<Map<String, TopicCreationGroup>>anyObject(),
-                anyObject(OffsetStorageReader.class),
-                anyObject(OffsetStorageWriter.class),
-                EasyMock.eq(config),
-                anyObject(ClusterConfigState.class),
-                anyObject(ConnectMetrics.class),
-                EasyMock.eq(pluginLoader),
-                anyObject(Time.class),
-                anyObject(RetryWithToleranceOperator.class),
-                anyObject(StatusBackingStore.class),
-                anyObject(Executor.class))
-                .andReturn(workerTask);
-    }
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class WorkerTestConnector extends SourceConnector {

Review comment:
       In trunk we now have sample plugins (`SampleSourceConnector`, 
`SampleSinkConnector`, etc) in this folder. Can we reuse them and get rid of 
some of these inner classes?

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -909,73 +820,45 @@ public void testStartTaskFailure() {
         assertStartupStatistics(worker, 0, 0, 1, 1);
         assertEquals(Collections.emptySet(), worker.taskIds());
 
-        PowerMock.verifyAll();
+        verify(taskStatusListener).onFailure(eq(TASK_ID), 
any(ConfigException.class));
+        pluginsMockedStatic.verify(() ->  
Plugins.compareAndSwapLoaders(pluginLoader));
+        pluginsMockedStatic.verify(() ->  
Plugins.compareAndSwapLoaders(delegatingLoader));
     }
 
     @Test
-    public void testCleanupTasksOnStop() throws Exception {
-        expectConverters();
-        expectStartStorage();
-        expectFileConfigProvider();
-
-        EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
+    public void testCleanupTasksOnStop() {
+        mockInternalConverters();
+        mockStorage();
+        mockFileConfigProvider();
 
-        
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
-        expectNewWorkerTask();
-        Map<String, String> origProps = new HashMap<>();
-        origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
-
-        TaskConfig taskConfig = new TaskConfig(origProps);
-        // We should expect this call, but the pluginLoader being swapped in 
is only mocked.
-        // 
EasyMock.expect(pluginLoader.loadClass(TestSourceTask.class.getName()))
-        //        .andReturn((Class) TestSourceTask.class);
-        EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task);
-        EasyMock.expect(task.version()).andReturn("1.0");
-
-        workerTask.initialize(taskConfig);
-        EasyMock.expectLastCall();
+        when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
+        when(plugins.newTask(TestSourceTask.class)).thenReturn(task);
+        when(task.version()).thenReturn("1.0");
 
         // Expect that the worker will create converters and will not 
initially find them using the current classloader ...
         assertNotNull(taskKeyConverter);
         assertNotNull(taskValueConverter);
         assertNotNull(taskHeaderConverter);
-        expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
-        expectTaskKeyConverters(ClassLoaderUsage.PLUGINS, taskKeyConverter);
-        expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
-        expectTaskValueConverters(ClassLoaderUsage.PLUGINS, 
taskValueConverter);
-        expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
-        expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, 
taskHeaderConverter);
-
-        EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
-
-        
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
-        
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
-                .andReturn(pluginLoader);
-
-        
EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader)
-                .times(2);
-
-        EasyMock.expect(workerTask.loader()).andReturn(pluginLoader);
+        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, null);
+        mockTaskConverter(ClassLoaderUsage.PLUGINS, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
+        mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, null);
+        mockTaskConverter(ClassLoaderUsage.PLUGINS, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
+        mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        mockTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
 
-        
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
-                .times(2);
-        plugins.connectorClass(WorkerTestConnector.class.getName());
-        EasyMock.expectLastCall().andReturn(WorkerTestConnector.class);
-        // Remove on Worker.stop()
-        workerTask.stop();
-        EasyMock.expectLastCall();
+        
when(executorService.submit(any(WorkerSourceTask.class))).thenReturn(null);
 
-        
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true);
-        // Note that in this case we *do not* commit offsets since it's an 
unclean shutdown
-        EasyMock.expectLastCall();
+        when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
+        
when(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())).thenReturn(pluginLoader);
+        
doReturn(WorkerTestConnector.class).when(plugins).connectorClass(WorkerTestConnector.class.getName());
 
-        workerTask.removeMetrics();
-        EasyMock.expectLastCall();
+        pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
+        pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
 
-        expectStopStorage();
-        expectClusterId();
+        Map<String, String> origProps = new HashMap<>();
+        origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());

Review comment:
       We can use `Collections.singletonMap` here

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -1166,11 +1058,13 @@ public void testConsumerConfigsWithOverrides() {
         expectedConfigs.put("client.id", "consumer-test-id");
         expectedConfigs.put("metrics.context.connect.kafka.cluster.id", 
CLUSTER_ID);
 
-        
EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new
 HashMap<>());
-        PowerMock.replayAll();
+        
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(new
 HashMap<>());
+
         assertEquals(expectedConfigs, Worker.consumerConfigs(new 
ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
             null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
 
+        
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX);
+

Review comment:
       We can remove this extra empty line. There's a bunch more of empty extra 
lines below that we can remove too.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -1076,22 +949,39 @@ public void testConverterOverrides() throws Exception {
         assertStatistics(worker, 0, 0);
 
         // We've mocked the Plugin.newConverter method, so we don't currently 
configure the converters
+        verify(plugins).newTask(TestSourceTask.class);
+        WorkerSourceTask instantiatedTask = 
sourceTaskMockedConstruction.constructed().get(0);
+        verify(instantiatedTask).initialize(taskConfig);
+        verify(executorService).submit(any(WorkerSourceTask.class));
+        verify(plugins).delegatingLoader();
+        
verify(delegatingLoader).connectorLoader(WorkerTestConnector.class.getName());
+        pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
+        pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
+        verify(plugins).connectorClass(WorkerTestConnector.class.getName());
+

Review comment:
       We can remove this extra empty line

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -1076,22 +949,39 @@ public void testConverterOverrides() throws Exception {
         assertStatistics(worker, 0, 0);
 
         // We've mocked the Plugin.newConverter method, so we don't currently 
configure the converters
+        verify(plugins).newTask(TestSourceTask.class);
+        WorkerSourceTask instantiatedTask = 
sourceTaskMockedConstruction.constructed().get(0);
+        verify(instantiatedTask).initialize(taskConfig);
+        verify(executorService).submit(any(WorkerSourceTask.class));
+        verify(plugins).delegatingLoader();
+        
verify(delegatingLoader).connectorLoader(WorkerTestConnector.class.getName());
+        pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
+        pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
+        verify(plugins).connectorClass(WorkerTestConnector.class.getName());
+
 
-        PowerMock.verifyAll();
+        // Remove
+        verify(instantiatedTask).stop();
+        verify(instantiatedTask).awaitStop(anyLong());
+        verify(instantiatedTask).removeMetrics();
+
+        verify(plugins, times(2)).currentThreadLoader();
+        verifyStorage();
     }
 
     @Test
     public void testProducerConfigsWithoutOverrides() {
-        
EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn(
-            new HashMap<>());
-        PowerMock.replayAll();
+        
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).thenReturn(new
 HashMap<>());
         Map<String, String> expectedConfigs = new 
HashMap<>(defaultProducerConfigs);
         expectedConfigs.put("client.id", "connector-producer-job-0");
         expectedConfigs.put("metrics.context.connect.kafka.cluster.id", 
CLUSTER_ID);
         assertEquals(expectedConfigs,
                      Worker.producerConfigs(TASK_ID, "connector-producer-" + 
TASK_ID, config, connectorConfig, null, 
noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
+
+        
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX);
     }
 
+

Review comment:
       We can remove this extra empty line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to