This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new c27f7a3 Optimize built-in source/sink startup Part 2 (#9500) c27f7a3 is described below commit c27f7a340a3c78a3a5004d08b8ac05ed99672ee3 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Feb 8 12:33:06 2021 -0800 Optimize built-in source/sink startup Part 2 (#9500) * Optimize built-in source/sink startup by eliminating redundant NAR unpacking and checksum calculation Part 2. Allow ThreadRuntime to used cached built-in connectors instead of unpacking and loading again. Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../functions/instance/JavaInstanceRunnable.java | 54 ++---------- .../instance/JavaInstanceRunnableTest.java | 2 +- .../pulsar/functions/runtime/RuntimeFactory.java | 2 + .../kubernetes/KubernetesRuntimeFactory.java | 2 + .../runtime/process/ProcessRuntimeFactory.java | 2 + .../functions/runtime/thread/ThreadRuntime.java | 95 ++++++++++++++++++---- .../runtime/thread/ThreadRuntimeFactory.java | 15 +++- .../pulsar/functions/worker/ConnectorsManager.java | 4 +- .../kubernetes/KubernetesRuntimeFactoryTest.java | 5 +- .../runtime/kubernetes/KubernetesRuntimeTest.java | 7 +- .../runtime/process/ProcessRuntimeTest.java | 4 +- .../runtime/thread/ThreadRuntimeFactoryTest.java | 2 + .../pulsar/functions/utils/FunctionCommon.java | 22 +++++ .../pulsar/functions/worker/FunctionActioner.java | 4 +- .../functions/worker/FunctionRuntimeManager.java | 4 +- .../pulsar/functions/worker/WorkerUtils.java | 22 ----- .../functions/worker/rest/api/ComponentImpl.java | 8 +- .../functions/worker/rest/api/FunctionsImpl.java | 2 +- .../functions/worker/rest/api/SinksImpl.java | 3 +- .../functions/worker/rest/api/SourcesImpl.java | 2 +- .../functions/worker/rest/api/WorkerImpl.java | 2 +- .../worker/FunctionRuntimeManagerTest.java | 2 + .../worker/rest/api/FunctionsImplTest.java | 4 +- 23 files changed, 155 insertions(+), 114 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index c542f05..02083ed 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -73,6 +74,7 @@ import org.apache.pulsar.functions.source.batch.BatchSourceExecutor; import org.apache.pulsar.functions.utils.CryptoUtils; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; +import org.apache.pulsar.functions.utils.io.Connector; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; import org.slf4j.Logger; @@ -85,8 +87,6 @@ import org.slf4j.LoggerFactory; public class JavaInstanceRunnable implements AutoCloseable, Runnable { private final InstanceConfig instanceConfig; - private final FunctionCacheManager fnCache; - private final String jarFile; // input topic consumer & output topic producer private final PulsarClientImpl client; @@ -124,7 +124,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private final ClassLoader instanceClassLoader; private ClassLoader functionClassLoader; - private String narExtractionDirectory; // a flog to determine if member variables have been initialized as part of setup(). // used for out of band API calls like operations involving stats @@ -134,23 +133,19 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private ReadWriteLock statsLock = new ReentrantReadWriteLock(); public JavaInstanceRunnable(InstanceConfig instanceConfig, - FunctionCacheManager fnCache, - String jarFile, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, String stateStorageServiceUrl, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, - String narExtractionDirectory) { + ClassLoader functionClassLoader) { this.instanceConfig = instanceConfig; - this.fnCache = fnCache; - this.jarFile = jarFile; this.client = (PulsarClientImpl) pulsarClient; this.pulsarAdmin = pulsarAdmin; this.stateStorageServiceUrl = stateStorageServiceUrl; this.secretsProvider = secretsProvider; this.collectorRegistry = collectorRegistry; - this.narExtractionDirectory = narExtractionDirectory; + this.functionClassLoader = functionClassLoader; this.metricsLabels = new String[]{ instanceConfig.getFunctionDetails().getTenant(), String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(), @@ -197,9 +192,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { log.info("Starting Java Instance {} : \n Details = {}", instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails()); - // start the function thread - functionClassLoader = loadJars(); - Object object; if (instanceConfig.getFunctionDetails().getClassName().equals(org.apache.pulsar.functions.windowing.WindowFunctionExecutor.class.getName())) { object = Reflections.createInstance( @@ -305,35 +297,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } - private ClassLoader loadJars() throws Exception { - ClassLoader fnClassLoader; - try { - log.info("Load JAR: {}", jarFile); - // Let's first try to treat it as a nar archive - fnCache.registerFunctionInstanceWithArchive( - instanceConfig.getFunctionId(), - instanceConfig.getInstanceName(), - jarFile, narExtractionDirectory); - } catch (FileNotFoundException e) { - // create the function class loader - fnCache.registerFunctionInstance( - instanceConfig.getFunctionId(), - instanceConfig.getInstanceName(), - Arrays.asList(jarFile), - Collections.emptyList()); - } - - log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}", - instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(instanceConfig.getFunctionId())); - - fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId()); - if (null == fnClassLoader) { - throw new Exception("No function class loader available."); - } - - return fnClassLoader; - } - private void setupStateStore() throws Exception { this.stateManager = new InstanceStateManager(); @@ -475,14 +438,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { stateStoreProvider.close(); } - if (instanceCache != null) { - // once the thread quits, clean up the instance - fnCache.unregisterFunctionInstance( - instanceConfig.getFunctionId(), - instanceConfig.getInstanceName()); - log.info("Unloading JAR files for function {}", instanceConfig); - instanceCache = null; - } + instanceCache = null; if (logAppender != null) { removeLogTopicAppender(LoggerContext.getContext()); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 9286b94..56d80ae 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -59,7 +59,7 @@ public class JavaInstanceRunnableTest { private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception { InstanceConfig config = createInstanceConfig(outputSerde); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - config, null, null, null, null, null, null, null, null); + config, null, null, null, null, null, null); return javaInstanceRunnable; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java index 56fb701..0f034d0 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java @@ -25,6 +25,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.common.util.Reflections; +import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.WorkerConfig; import java.util.Optional; @@ -37,6 +38,7 @@ public interface RuntimeFactory extends AutoCloseable { void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig, SecretsProviderConfigurator secretsProviderConfigurator, + ConnectorsManager connectorsManager, Optional<FunctionAuthProvider> authProvider, Optional<RuntimeCustomizer> runtimeCustomizer) throws Exception; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index f001af4..b332722 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -43,6 +43,7 @@ import org.apache.pulsar.functions.runtime.RuntimeCustomizer; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeUtils; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.WorkerConfig; import java.lang.reflect.Field; @@ -129,6 +130,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { @Override public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig, SecretsProviderConfigurator secretsProviderConfigurator, + ConnectorsManager connectorsManager, Optional<FunctionAuthProvider> functionAuthProvider, Optional<RuntimeCustomizer> runtimeCustomizer) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java index 04b871c..aaa0004 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java @@ -34,6 +34,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeUtils; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry; +import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.WorkerConfig; import java.nio.file.Paths; @@ -94,6 +95,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory { @Override public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig, SecretsProviderConfigurator secretsProviderConfigurator, + ConnectorsManager connectorsManager, Optional<FunctionAuthProvider> authProvider, Optional<RuntimeCustomizer> runtimeCustomizer) { ProcessRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig( diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java index f8b0a34..a9614a1 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java @@ -19,7 +19,12 @@ package org.apache.pulsar.functions.runtime.thread; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import io.prometheus.client.CollectorRegistry; @@ -28,6 +33,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; @@ -36,6 +42,8 @@ import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.functions.instance.JavaInstanceRunnable; +import org.apache.pulsar.functions.utils.io.Connector; +import org.apache.pulsar.functions.worker.ConnectorsManager; /** * A function container implemented using java thread. @@ -60,6 +68,8 @@ public class ThreadRuntime implements Runtime { private SecretsProvider secretsProvider; private CollectorRegistry collectorRegistry; private String narExtractionDirectory; + private final Optional<ConnectorsManager> connectorsManager; + ThreadRuntime(InstanceConfig instanceConfig, FunctionCacheManager fnCache, ThreadGroup threadGroup, @@ -69,7 +79,8 @@ public class ThreadRuntime implements Runtime { String stateStorageServiceUrl, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, - String narExtractionDirectory) { + String narExtractionDirectory, + Optional<ConnectorsManager> connectorsManager) { this.instanceConfig = instanceConfig; if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) { throw new RuntimeException("Thread Container only supports Java Runtime"); @@ -84,34 +95,82 @@ public class ThreadRuntime implements Runtime { this.secretsProvider = secretsProvider; this.collectorRegistry = collectorRegistry; this.narExtractionDirectory = narExtractionDirectory; - this.javaInstanceRunnable = new JavaInstanceRunnable( - instanceConfig, - fnCache, - jarFile, - pulsarClient, - pulsarAdmin, - stateStorageServiceUrl, - secretsProvider, - collectorRegistry, - narExtractionDirectory); + this.connectorsManager = connectorsManager; + } + + private static ClassLoader getFunctionClassLoader(InstanceConfig instanceConfig, + String jarFile, + String narExtractionDirectory, + FunctionCacheManager fnCache, + Optional<ConnectorsManager> connectorsManager) throws Exception { + + if (FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails()) + && connectorsManager.isPresent()) { + switch (InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails())) { + case SOURCE: + return connectorsManager.get().getConnector( + instanceConfig.getFunctionDetails().getSource().getBuiltin()).getClassLoader(); + case SINK: + return connectorsManager.get().getConnector( + instanceConfig.getFunctionDetails().getSink().getBuiltin()).getClassLoader(); + default: + return loadJars(jarFile, instanceConfig, narExtractionDirectory, fnCache); + } + } else { + return loadJars(jarFile, instanceConfig, narExtractionDirectory, fnCache); + } + } + + private static ClassLoader loadJars(String jarFile, + InstanceConfig instanceConfig, + String narExtractionDirectory, + FunctionCacheManager fnCache) throws Exception { + ClassLoader fnClassLoader; + try { + log.info("Load JAR: {}", jarFile); + // Let's first try to treat it as a nar archive + fnCache.registerFunctionInstanceWithArchive( + instanceConfig.getFunctionId(), + instanceConfig.getInstanceName(), + jarFile, narExtractionDirectory); + } catch (FileNotFoundException e) { + // create the function class loader + fnCache.registerFunctionInstance( + instanceConfig.getFunctionId(), + instanceConfig.getInstanceName(), + Arrays.asList(jarFile), + Collections.emptyList()); + } + + log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}", + instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(instanceConfig.getFunctionId())); + + fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId()); + if (null == fnClassLoader) { + throw new Exception("No function class loader available."); + } + + return fnClassLoader; } /** * The core logic that initialize the thread container and executes the function. */ @Override - public void start() { + public void start() throws Exception { + + // extract class loader for function + ClassLoader functionClassLoader = getFunctionClassLoader(instanceConfig, jarFile, narExtractionDirectory, fnCache, connectorsManager); + // re-initialize JavaInstanceRunnable so that variables in constructor can be re-initialized this.javaInstanceRunnable = new JavaInstanceRunnable( instanceConfig, - fnCache, - jarFile, pulsarClient, pulsarAdmin, stateStorageServiceUrl, secretsProvider, collectorRegistry, - narExtractionDirectory); + functionClassLoader); log.info("ThreadContainer starting function with instance config {}", instanceConfig); this.fnThread = new Thread(threadGroup, javaInstanceRunnable, String.format("%s-%s", @@ -145,6 +204,12 @@ public class ThreadRuntime implements Runtime { } // make sure JavaInstanceRunnable is closed this.javaInstanceRunnable.close(); + + log.info("Unloading JAR files for function {}", instanceConfig); + // once the thread quits, clean up the instance + fnCache.unregisterFunctionInstance( + instanceConfig.getFunctionId(), + instanceConfig.getInstanceName()); } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java index f5e3736..d44cf1b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java @@ -41,6 +41,7 @@ import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderCo import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl; +import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.WorkerConfig; import java.util.Optional; @@ -64,6 +65,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { private volatile boolean closed; private SecretsProviderConfigurator secretsProviderConfigurator; private ClassLoader rootClassLoader; + private Optional<ConnectorsManager> connectorsManager; /** * This constructor is used by other runtimes (e.g. ProcessRuntime and KubernetesRuntime) that rely on ThreadRuntime to actually run an instance of the function. @@ -76,13 +78,15 @@ public class ThreadRuntimeFactory implements RuntimeFactory { String pulsarWebServiceUrl) throws Exception { initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig, storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory, - rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl); + rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty()); } private void initialize(String threadGroupName, Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit, String pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl, SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String narExtractionDirectory, - ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled, String pulsarWebServiceUrl) throws PulsarClientException { + ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled, + String pulsarWebServiceUrl, Optional<ConnectorsManager> connectorsManager) throws PulsarClientException { + if (rootClassLoader == null) { rootClassLoader = Thread.currentThread().getContextClassLoader(); } @@ -97,6 +101,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { this.storageServiceUrl = storageServiceUrl; this.collectorRegistry = collectorRegistry; this.narExtractionDirectory = narExtractionDirectory; + this.connectorsManager = connectorsManager; } private Optional<Long> calculateClientMemoryLimit(Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit) { @@ -134,6 +139,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { @Override public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig, SecretsProviderConfigurator secretsProviderConfigurator, + ConnectorsManager connectorsManager, Optional<FunctionAuthProvider> functionAuthProvider, Optional<RuntimeCustomizer> runtimeCustomizer) throws Exception { ThreadRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig( @@ -143,7 +149,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { workerConfig.getPulsarServiceUrl(), authenticationConfig, workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null, null, workerConfig.getNarExtractionDirectory(), null, - workerConfig.isExposeAdminClientEnabled(), workerConfig.getPulsarWebServiceUrl()); + workerConfig.isExposeAdminClientEnabled(), workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager)); } @Override @@ -169,7 +175,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory { storageServiceUrl, secretsProvider, collectorRegistry, - narExtractionDirectory); + narExtractionDirectory, + connectorsManager); } @Override diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java similarity index 96% rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java index 856c8d6..0afffa5 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.io.ConfigFieldDefinition; import org.apache.pulsar.common.io.ConnectorDefinition; @@ -33,6 +34,7 @@ import java.util.stream.Collectors; @Slf4j public class ConnectorsManager { + @Getter private volatile TreeMap<String, Connector> connectors; public ConnectorsManager(WorkerConfig workerConfig) throws IOException { @@ -47,7 +49,7 @@ public class ConnectorsManager { return connectors.get(connectorType).getConnectorDefinition(); } - public List<ConnectorDefinition> getConnectors() { + public List<ConnectorDefinition> getConnectorDefinitions() { return connectors.values().stream().map(connector -> connector.getConnectorDefinition()).collect(Collectors.toList()); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java index dec1014..9870c3f 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java @@ -39,6 +39,7 @@ import org.apache.pulsar.functions.runtime.RuntimeCustomizer; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.WorkerConfig; import org.mockito.Mockito; import org.testng.annotations.AfterMethod; @@ -181,7 +182,7 @@ public class KubernetesRuntimeFactoryTest { workerConfig.setStateStorageServiceUrl(null); workerConfig.setAuthenticationEnabled(false); - factory.initialize(workerConfig,null, new TestSecretProviderConfigurator(), functionAuthProvider, manifestCustomizer); + factory.initialize(workerConfig,null, new TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class), functionAuthProvider, manifestCustomizer); return factory; } @@ -383,7 +384,7 @@ public class KubernetesRuntimeFactoryTest { workerConfig.setFunctionRuntimeFactoryConfigs( ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class)); AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build(); - kubernetesRuntimeFactory.initialize(workerConfig, authenticationConfig, new DefaultSecretsProviderConfigurator(), Optional.empty(), Optional.empty()); + kubernetesRuntimeFactory.initialize(workerConfig, authenticationConfig, new DefaultSecretsProviderConfigurator(), Mockito.mock(ConnectorsManager.class), Optional.empty(), Optional.empty()); return kubernetesRuntimeFactory; } } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 5c1c901..54c72cb 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -39,7 +39,9 @@ import org.apache.pulsar.functions.runtime.thread.ThreadRuntime; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.FunctionCommon; +import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.WorkerConfig; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -219,7 +221,7 @@ public class KubernetesRuntimeTest { manifestCustomizer.ifPresent(runtimeCustomizer -> runtimeCustomizer.initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()))); - factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer); + factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class), Optional.empty(), manifestCustomizer); return factory; } @@ -910,7 +912,8 @@ public class KubernetesRuntimeTest { manifestCustomizer.get().initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap())); } - factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer); + factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), + Mockito.mock(ConnectorsManager.class), Optional.empty(), manifestCustomizer); return factory; } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java index 66090f5..69336fa 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java @@ -49,7 +49,9 @@ import org.apache.pulsar.functions.runtime.thread.ThreadRuntime; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.FunctionCommon; +import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.WorkerConfig; +import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -170,7 +172,7 @@ public class ProcessRuntimeTest { workerConfig.setFunctionRuntimeFactoryClassName(ProcessRuntimeFactory.class.getName()); workerConfig.setFunctionRuntimeFactoryConfigs( ObjectMapperFactory.getThreadLocal().convertValue(processRuntimeFactoryConfig, Map.class)); - processRuntimeFactory.initialize(workerConfig, null, new TestSecretsProviderConfigurator(), Optional.empty(), Optional.empty()); + processRuntimeFactory.initialize(workerConfig, null, new TestSecretsProviderConfigurator(), Mockito.mock(ConnectorsManager.class), Optional.empty(), Optional.empty()); return processRuntimeFactory; } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java index d0e2596..8346e9d 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java @@ -29,6 +29,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.RestException; import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.WorkerConfig; import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; @@ -145,6 +146,7 @@ public class ThreadRuntimeFactoryTest { workerConfig, Mockito.mock(AuthenticationConfig.class), Mockito.mock(SecretsProviderConfigurator.class), + Mockito.mock(ConnectorsManager.class), Optional.empty(), Optional.empty()); return clientBuilder; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java index 41dbcbd..7858663 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java @@ -483,4 +483,26 @@ public class FunctionCommon { public static String capFirstLetter(Enum en) { return StringUtils.capitalize(en.toString().toLowerCase()); } + + public static boolean isFunctionCodeBuiltin(org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder functionDetails) { + if (functionDetails.hasSource()) { + org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = functionDetails.getSource(); + if (!isEmpty(sourceSpec.getBuiltin())) { + return true; + } + } + + if (functionDetails.hasSink()) { + org.apache.pulsar.functions.proto.Function.SinkSpec sinkSpec = functionDetails.getSink(); + if (!isEmpty(sinkSpec.getBuiltin())) { + return true; + } + } + + if (!isEmpty(functionDetails.getBuiltin())) { + return true; + } + + return false; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index e051f12..73f8f95 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -31,7 +31,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.io.BatchSourceConfig; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -48,7 +47,6 @@ import org.apache.pulsar.functions.utils.Actions; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.SourceConfigUtils; import org.apache.pulsar.functions.utils.io.Connector; -import org.apache.pulsar.functions.utils.io.ConnectorUtils; import java.io.File; import java.io.FileNotFoundException; @@ -120,7 +118,7 @@ public class FunctionActioner { URL url = new URL(pkgLocation); File pkgFile = new File(url.toURI()); packageFile = pkgFile.getAbsolutePath(); - } else if (WorkerUtils.isFunctionCodeBuiltin(functionDetails)) { + } else if (FunctionCommon.isFunctionCodeBuiltin(functionDetails)) { File pkgFile = getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails())); packageFile = pkgFile.getAbsolutePath(); } else { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 67b72be..c95475a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -206,7 +206,9 @@ public class FunctionRuntimeManager implements AutoCloseable{ } } // initialize runtime - this.runtimeFactory.initialize(workerConfig, authConfig, secretsProviderConfigurator, functionAuthProvider, runtimeCustomizer); + this.runtimeFactory.initialize(workerConfig, authConfig, + secretsProviderConfigurator, connectorsManager, + functionAuthProvider, runtimeCustomizer); this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory, dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java index 5e65990..821a8fc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java @@ -314,28 +314,6 @@ public final class WorkerUtils { } } - public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder functionDetails) { - if (functionDetails.hasSource()) { - Function.SourceSpec sourceSpec = functionDetails.getSource(); - if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { - return true; - } - } - - if (functionDetails.hasSink()) { - Function.SinkSpec sinkSpec = functionDetails.getSink(); - if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { - return true; - } - } - - if (!StringUtils.isEmpty(functionDetails.getBuiltin())) { - return true; - } - - return false; - } - public static Reader<byte[]> createReader(ReaderBuilder readerBuilder, String readerName, String topic, diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 62747bf..8699ecf 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -26,7 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace; import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName; -import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; +import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin; import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; import io.netty.buffer.ByteBuf; @@ -59,10 +59,8 @@ import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.RestException; import org.apache.pulsar.functions.instance.InstanceUtils; @@ -78,7 +76,6 @@ import org.apache.pulsar.functions.utils.ComponentTypeUtils; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.FunctionMetaDataUtils; -import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; @@ -101,7 +98,6 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -900,7 +896,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> { throwUnavailableException(); } - return this.worker().getConnectorsManager().getConnectors(); + return this.worker().getConnectorsManager().getConnectorDefinitions(); } @Override diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 72253cb..b933c0d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -64,7 +64,7 @@ import java.util.function.Supplier; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; -import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; +import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin; import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; @Slf4j diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index a7909d7..c059e18 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -53,14 +53,13 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.file.Files; -import java.nio.file.Path; import java.util.*; import java.util.function.Supplier; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; -import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; +import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin; import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; @Slf4j diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index 15b5c9e..089b67c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -59,7 +59,7 @@ import java.util.function.Supplier; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; -import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; +import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin; import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; @Slf4j diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java index a67d179..7d24d8c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -208,7 +208,7 @@ public class WorkerImpl implements Workers<PulsarWorkerService> { throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation"); } - return this.worker().getConnectorsManager().getConnectors(); + return this.worker().getConnectorsManager().getConnectorDefinitions(); } public void rebalance(final URI uri, final String clientRole) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index facae5a..4214364 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -699,6 +699,7 @@ public class FunctionRuntimeManagerTest { any(AuthenticationConfig.class), any(SecretsProviderConfigurator.class), any(), + any(), any() ); doNothing().when(kubernetesRuntimeFactory).setupClient(); @@ -944,6 +945,7 @@ public class FunctionRuntimeManagerTest { any(AuthenticationConfig.class), any(SecretsProviderConfigurator.class), any(), + any(), any() ); doNothing().when(mockedKubernetesRuntimeFactory).setupClient(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java index 8e026d9..41755f7 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -163,7 +163,7 @@ public class FunctionsImplTest { instanceConfig.setMaxBufferedTuples(1024); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - instanceConfig, null, null, null, null, null, null, null, null); + instanceConfig, null, null, null, null, null, null); CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<InstanceCommunication.MetricsData>(); metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics()); Runtime runtime = mock(Runtime.class); @@ -209,7 +209,7 @@ public class FunctionsImplTest { instanceConfig.setMaxBufferedTuples(1024); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - instanceConfig, null, null, null, null, null, null, null, null); + instanceConfig, null, null, null, null, null, null); CompletableFuture<InstanceCommunication.MetricsData> completableFuture = new CompletableFuture<InstanceCommunication.MetricsData>(); completableFuture.complete(javaInstanceRunnable.getMetrics()); Runtime runtime = mock(Runtime.class);