mimaison commented on a change in pull request #11817: URL: https://github.com/apache/kafka/pull/11817#discussion_r819461371
########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ########## @@ -289,54 +314,60 @@ public void testStartAndStopConnector() throws Throwable { assertStatistics(worker, 1, 0); assertStartupStatistics(worker, 1, 0, 0, 0); worker.stopAndAwaitConnector(CONNECTOR_ID); + assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 1, 0, 0, 0); assertEquals(Collections.emptySet(), worker.connectorNames()); + // Nothing should be left, so this should effectively be a nop worker.stop(); assertStatistics(worker, 0, 0); - PowerMock.verifyAll(); + + verify(plugins, times(2)).currentThreadLoader(); + verify(plugins).delegatingLoader(); + verify(delegatingLoader).connectorLoader(connectorClass); + verify(plugins).newConnector(connectorClass); + verify(sourceConnector, times(2)).version(); + verify(sourceConnector).initialize(any(ConnectorContext.class)); + verify(sourceConnector).start(connectorProps); + verify(connectorStatusListener).onStartup(CONNECTOR_ID); + + pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2)); + pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); + connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))); + + verify(sourceConnector).stop(); + verify(connectorStatusListener).onShutdown(CONNECTOR_ID); + verify(ctx).close(); MockFileConfigProvider.assertClosed(mockFileProviderTestId); } - private void expectFileConfigProvider() { - EasyMock.expect(plugins.newConfigProvider(EasyMock.anyObject(), - EasyMock.eq("config.providers.file"), EasyMock.anyObject())) - .andAnswer(() -> { - MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider(); - mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId)); - return mockFileConfigProvider; - }); + private void mockFileConfigProvider() { + MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider(); + mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId)); + when(plugins.newConfigProvider(any(AbstractConfig.class), + eq("config.providers.file"), + any(ClassLoaderUsage.class))) Review comment: Can we align these 2 lines with `any` above? ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ########## @@ -213,69 +234,73 @@ public void setup() { // Some common defaults. They might change on individual tests connectorProps = anyConnectorConfigMap(); - PowerMock.mockStatic(Plugins.class); + + pluginsMockedStatic = mockStatic(Plugins.class); + + // pass through things that aren't explicitly mocked out + connectUtilsMockedStatic = mockStatic(ConnectUtils.class, new CallsRealMethods()); + connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID); + + // Make calls to new WorkerSourceTask() return a mock to avoid the source task trying to connect to a broker. + sourceTaskMockedConstruction = mockConstructionWithAnswer(WorkerSourceTask.class, invocation -> { + + // provide implementations of three methods used during testing + switch (invocation.getMethod().getName()) { + case "id": + return TASK_ID; + case "loader": + return pluginLoader; + case "awaitStop": + return true; + default: + return null; + } + }); + } + + @After + public void teardown() { + // Critical to always close MockedStatics + // Ideal would be to use try-with-resources in an individual test, but it introduced a rather large level of + // indentation of most test bodies, hence sticking with setup() / teardown() + pluginsMockedStatic.close(); + connectUtilsMockedStatic.close(); + sourceTaskMockedConstruction.close(); + + mockitoSession.finishMocking(); } @Test public void testStartAndStopConnector() throws Throwable { - expectConverters(); - expectStartStorage(); final String connectorClass = WorkerTestConnector.class.getName(); - - // Create - EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); - EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); - EasyMock.expect(delegatingLoader.connectorLoader(connectorClass)).andReturn(pluginLoader); - EasyMock.expect(plugins.newConnector(connectorClass)) - .andReturn(sourceConnector); - EasyMock.expect(sourceConnector.version()).andReturn("1.0"); - connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass); - EasyMock.expect(sourceConnector.version()).andReturn("1.0"); - - expectFileConfigProvider(); - EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)) - .andReturn(delegatingLoader) - .times(3); - sourceConnector.initialize(anyObject(ConnectorContext.class)); - EasyMock.expectLastCall(); - sourceConnector.start(connectorProps); - EasyMock.expectLastCall(); - - EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) - .andReturn(pluginLoader).times(3); - - connectorStatusListener.onStartup(CONNECTOR_ID); - EasyMock.expectLastCall(); - - // Remove - sourceConnector.stop(); - EasyMock.expectLastCall(); - - connectorStatusListener.onShutdown(CONNECTOR_ID); - EasyMock.expectLastCall(); - - ctx.close(); - expectLastCall(); - - expectStopStorage(); - expectClusterId(); + // Create + when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + when(plugins.delegatingLoader()).thenReturn(delegatingLoader); + when(delegatingLoader.connectorLoader(connectorClass)).thenReturn(pluginLoader); + when(plugins.newConnector(connectorClass)).thenReturn(sourceConnector); + when(sourceConnector.version()).thenReturn("1.0"); - PowerMock.replayAll(); + pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader); + pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader); + connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))) + .thenReturn(CLUSTER_ID); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); - worker.herder = herder; worker.start(); assertEquals(Collections.emptySet(), worker.connectorNames()); FutureCallback<TargetState> onFirstStart = new FutureCallback<>(); + worker.startConnector(CONNECTOR_ID, connectorProps, ctx, connectorStatusListener, TargetState.STARTED, onFirstStart); + // Wait for the connector to actually start assertEquals(TargetState.STARTED, onFirstStart.get(1000, TimeUnit.MILLISECONDS)); - assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); + assertEquals(new HashSet<>(Collections.singletonList(CONNECTOR_ID)), worker.connectorNames()); Review comment: We can use `Collections.singleton(CONNECTOR_ID)`. There are a few other places where we can use `Collections.singleton()` instead of `new HashSet()`. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ########## @@ -1468,36 +1287,7 @@ private void expectTaskHeaderConverter(ClassLoaderUsage classLoaderUsage, Header return props; } - private void expectClusterId() { - PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId"); - EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes(); - } - private void expectNewWorkerTask() throws Exception { - PowerMock.expectNew( - WorkerSourceTask.class, EasyMock.eq(TASK_ID), - EasyMock.eq(task), - anyObject(TaskStatus.Listener.class), - EasyMock.eq(TargetState.STARTED), - anyObject(JsonConverter.class), - anyObject(JsonConverter.class), - anyObject(JsonConverter.class), - EasyMock.eq(new TransformationChain<>(Collections.emptyList(), NOOP_OPERATOR)), - anyObject(KafkaProducer.class), - anyObject(TopicAdmin.class), - EasyMock.<Map<String, TopicCreationGroup>>anyObject(), - anyObject(OffsetStorageReader.class), - anyObject(OffsetStorageWriter.class), - EasyMock.eq(config), - anyObject(ClusterConfigState.class), - anyObject(ConnectMetrics.class), - EasyMock.eq(pluginLoader), - anyObject(Time.class), - anyObject(RetryWithToleranceOperator.class), - anyObject(StatusBackingStore.class), - anyObject(Executor.class)) - .andReturn(workerTask); - } /* Name here needs to be unique as we are testing the aliasing mechanism */ public static class WorkerTestConnector extends SourceConnector { Review comment: In trunk we now have sample plugins (`SampleSourceConnector`, `SampleSinkConnector`, etc) in this folder. Can we reuse them and get rid of some of these inner classes? ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ########## @@ -909,73 +820,45 @@ public void testStartTaskFailure() { assertStartupStatistics(worker, 0, 0, 1, 1); assertEquals(Collections.emptySet(), worker.taskIds()); - PowerMock.verifyAll(); + verify(taskStatusListener).onFailure(eq(TASK_ID), any(ConfigException.class)); + pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader)); + pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader)); } @Test - public void testCleanupTasksOnStop() throws Exception { - expectConverters(); - expectStartStorage(); - expectFileConfigProvider(); - - EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); + public void testCleanupTasksOnStop() { + mockInternalConverters(); + mockStorage(); + mockFileConfigProvider(); - EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); - expectNewWorkerTask(); - Map<String, String> origProps = new HashMap<>(); - origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); - - TaskConfig taskConfig = new TaskConfig(origProps); - // We should expect this call, but the pluginLoader being swapped in is only mocked. - // EasyMock.expect(pluginLoader.loadClass(TestSourceTask.class.getName())) - // .andReturn((Class) TestSourceTask.class); - EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task); - EasyMock.expect(task.version()).andReturn("1.0"); - - workerTask.initialize(taskConfig); - EasyMock.expectLastCall(); + when(plugins.currentThreadLoader()).thenReturn(delegatingLoader); + when(plugins.newTask(TestSourceTask.class)).thenReturn(task); + when(task.version()).thenReturn("1.0"); // Expect that the worker will create converters and will not initially find them using the current classloader ... assertNotNull(taskKeyConverter); assertNotNull(taskValueConverter); assertNotNull(taskHeaderConverter); - expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null); - expectTaskKeyConverters(ClassLoaderUsage.PLUGINS, taskKeyConverter); - expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null); - expectTaskValueConverters(ClassLoaderUsage.PLUGINS, taskValueConverter); - expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); - expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); - - EasyMock.expect(executorService.submit(workerTask)).andReturn(null); - - EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); - EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) - .andReturn(pluginLoader); - - EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader) - .times(2); - - EasyMock.expect(workerTask.loader()).andReturn(pluginLoader); + mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, null); + mockTaskConverter(ClassLoaderUsage.PLUGINS, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); + mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, null); + mockTaskConverter(ClassLoaderUsage.PLUGINS, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); + mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); + mockTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); - EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader) - .times(2); - plugins.connectorClass(WorkerTestConnector.class.getName()); - EasyMock.expectLastCall().andReturn(WorkerTestConnector.class); - // Remove on Worker.stop() - workerTask.stop(); - EasyMock.expectLastCall(); + when(executorService.submit(any(WorkerSourceTask.class))).thenReturn(null); - EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true); - // Note that in this case we *do not* commit offsets since it's an unclean shutdown - EasyMock.expectLastCall(); + when(plugins.delegatingLoader()).thenReturn(delegatingLoader); + when(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())).thenReturn(pluginLoader); + doReturn(WorkerTestConnector.class).when(plugins).connectorClass(WorkerTestConnector.class.getName()); - workerTask.removeMetrics(); - EasyMock.expectLastCall(); + pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader); + pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader); - expectStopStorage(); - expectClusterId(); + Map<String, String> origProps = new HashMap<>(); + origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); Review comment: We can use `Collections.singletonMap` here ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ########## @@ -1166,11 +1058,13 @@ public void testConsumerConfigsWithOverrides() { expectedConfigs.put("client.id", "consumer-test-id"); expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID); - EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>()); - PowerMock.replayAll(); + when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(new HashMap<>()); + assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID)); + verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX); + Review comment: We can remove this extra empty line. There's a bunch more of empty extra lines below that we can remove too. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ########## @@ -1076,22 +949,39 @@ public void testConverterOverrides() throws Exception { assertStatistics(worker, 0, 0); // We've mocked the Plugin.newConverter method, so we don't currently configure the converters + verify(plugins).newTask(TestSourceTask.class); + WorkerSourceTask instantiatedTask = sourceTaskMockedConstruction.constructed().get(0); + verify(instantiatedTask).initialize(taskConfig); + verify(executorService).submit(any(WorkerSourceTask.class)); + verify(plugins).delegatingLoader(); + verify(delegatingLoader).connectorLoader(WorkerTestConnector.class.getName()); + pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2)); + pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); + verify(plugins).connectorClass(WorkerTestConnector.class.getName()); + Review comment: We can remove this extra empty line ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ########## @@ -1076,22 +949,39 @@ public void testConverterOverrides() throws Exception { assertStatistics(worker, 0, 0); // We've mocked the Plugin.newConverter method, so we don't currently configure the converters + verify(plugins).newTask(TestSourceTask.class); + WorkerSourceTask instantiatedTask = sourceTaskMockedConstruction.constructed().get(0); + verify(instantiatedTask).initialize(taskConfig); + verify(executorService).submit(any(WorkerSourceTask.class)); + verify(plugins).delegatingLoader(); + verify(delegatingLoader).connectorLoader(WorkerTestConnector.class.getName()); + pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2)); + pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); + verify(plugins).connectorClass(WorkerTestConnector.class.getName()); + - PowerMock.verifyAll(); + // Remove + verify(instantiatedTask).stop(); + verify(instantiatedTask).awaitStop(anyLong()); + verify(instantiatedTask).removeMetrics(); + + verify(plugins, times(2)).currentThreadLoader(); + verifyStorage(); } @Test public void testProducerConfigsWithoutOverrides() { - EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn( - new HashMap<>()); - PowerMock.replayAll(); + when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).thenReturn(new HashMap<>()); Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs); expectedConfigs.put("client.id", "connector-producer-job-0"); expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID); assertEquals(expectedConfigs, Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID)); + + verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX); } + Review comment: We can remove this extra empty line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org