This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c27f7a3  Optimize built-in source/sink startup Part 2 (#9500)
c27f7a3 is described below

commit c27f7a340a3c78a3a5004d08b8ac05ed99672ee3
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Feb 8 12:33:06 2021 -0800

    Optimize built-in source/sink startup Part 2 (#9500)
    
    * Optimize built-in source/sink startup by eliminating redundant NAR 
unpacking and checksum calculation Part 2. Allow ThreadRuntime to used cached 
built-in connectors instead of unpacking and loading again.
    
    Co-authored-by: Jerry Peng <jer...@splunk.com>
---
 .../functions/instance/JavaInstanceRunnable.java   | 54 ++----------
 .../instance/JavaInstanceRunnableTest.java         |  2 +-
 .../pulsar/functions/runtime/RuntimeFactory.java   |  2 +
 .../kubernetes/KubernetesRuntimeFactory.java       |  2 +
 .../runtime/process/ProcessRuntimeFactory.java     |  2 +
 .../functions/runtime/thread/ThreadRuntime.java    | 95 ++++++++++++++++++----
 .../runtime/thread/ThreadRuntimeFactory.java       | 15 +++-
 .../pulsar/functions/worker/ConnectorsManager.java |  4 +-
 .../kubernetes/KubernetesRuntimeFactoryTest.java   |  5 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |  7 +-
 .../runtime/process/ProcessRuntimeTest.java        |  4 +-
 .../runtime/thread/ThreadRuntimeFactoryTest.java   |  2 +
 .../pulsar/functions/utils/FunctionCommon.java     | 22 +++++
 .../pulsar/functions/worker/FunctionActioner.java  |  4 +-
 .../functions/worker/FunctionRuntimeManager.java   |  4 +-
 .../pulsar/functions/worker/WorkerUtils.java       | 22 -----
 .../functions/worker/rest/api/ComponentImpl.java   |  8 +-
 .../functions/worker/rest/api/FunctionsImpl.java   |  2 +-
 .../functions/worker/rest/api/SinksImpl.java       |  3 +-
 .../functions/worker/rest/api/SourcesImpl.java     |  2 +-
 .../functions/worker/rest/api/WorkerImpl.java      |  2 +-
 .../worker/FunctionRuntimeManagerTest.java         |  2 +
 .../worker/rest/api/FunctionsImplTest.java         |  4 +-
 23 files changed, 155 insertions(+), 114 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index c542f05..02083ed 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -73,6 +74,7 @@ import 
org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
 import org.apache.pulsar.functions.utils.CryptoUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
+import org.apache.pulsar.functions.utils.io.Connector;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 import org.slf4j.Logger;
@@ -85,8 +87,6 @@ import org.slf4j.LoggerFactory;
 public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     private final InstanceConfig instanceConfig;
-    private final FunctionCacheManager fnCache;
-    private final String jarFile;
 
     // input topic consumer & output topic producer
     private final PulsarClientImpl client;
@@ -124,7 +124,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     private final ClassLoader instanceClassLoader;
     private ClassLoader functionClassLoader;
-    private String narExtractionDirectory;
 
     // a flog to determine if member variables have been initialized as part 
of setup().
     // used for out of band API calls like operations involving stats
@@ -134,23 +133,19 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     private ReadWriteLock statsLock = new ReentrantReadWriteLock();
 
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
-                                FunctionCacheManager fnCache,
-                                String jarFile,
                                 PulsarClient pulsarClient,
                                 PulsarAdmin pulsarAdmin,
                                 String stateStorageServiceUrl,
                                 SecretsProvider secretsProvider,
                                 CollectorRegistry collectorRegistry,
-                                String narExtractionDirectory) {
+                                ClassLoader functionClassLoader) {
         this.instanceConfig = instanceConfig;
-        this.fnCache = fnCache;
-        this.jarFile = jarFile;
         this.client = (PulsarClientImpl) pulsarClient;
         this.pulsarAdmin = pulsarAdmin;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.secretsProvider = secretsProvider;
         this.collectorRegistry = collectorRegistry;
-        this.narExtractionDirectory = narExtractionDirectory;
+        this.functionClassLoader = functionClassLoader;
         this.metricsLabels = new String[]{
                 instanceConfig.getFunctionDetails().getTenant(),
                 String.format("%s/%s", 
instanceConfig.getFunctionDetails().getTenant(),
@@ -197,9 +192,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         log.info("Starting Java Instance {} : \n Details = {}",
             instanceConfig.getFunctionDetails().getName(), 
instanceConfig.getFunctionDetails());
 
-        // start the function thread
-        functionClassLoader = loadJars();
-
         Object object;
         if 
(instanceConfig.getFunctionDetails().getClassName().equals(org.apache.pulsar.functions.windowing.WindowFunctionExecutor.class.getName()))
 {
             object = Reflections.createInstance(
@@ -305,35 +297,6 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         }
     }
 
-    private ClassLoader loadJars() throws Exception {
-        ClassLoader fnClassLoader;
-        try {
-            log.info("Load JAR: {}", jarFile);
-            // Let's first try to treat it as a nar archive
-            fnCache.registerFunctionInstanceWithArchive(
-                instanceConfig.getFunctionId(),
-                instanceConfig.getInstanceName(),
-                jarFile, narExtractionDirectory);
-        } catch (FileNotFoundException e) {
-            // create the function class loader
-            fnCache.registerFunctionInstance(
-                    instanceConfig.getFunctionId(),
-                    instanceConfig.getInstanceName(),
-                    Arrays.asList(jarFile),
-                    Collections.emptyList());
-        }
-
-        log.info("Initialize function class loader for function {} at function 
cache manager, functionClassLoader: {}",
-                instanceConfig.getFunctionDetails().getName(), 
fnCache.getClassLoader(instanceConfig.getFunctionId()));
-
-        fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
-        if (null == fnClassLoader) {
-            throw new Exception("No function class loader available.");
-        }
-
-        return fnClassLoader;
-    }
-
     private void setupStateStore() throws Exception {
         this.stateManager = new InstanceStateManager();
 
@@ -475,14 +438,7 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
             stateStoreProvider.close();
         }
 
-        if (instanceCache != null) {
-            // once the thread quits, clean up the instance
-            fnCache.unregisterFunctionInstance(
-                    instanceConfig.getFunctionId(),
-                    instanceConfig.getInstanceName());
-            log.info("Unloading JAR files for function {}", instanceConfig);
-            instanceCache = null;
-        }
+        instanceCache = null;
 
         if (logAppender != null) {
             removeLogTopicAppender(LoggerContext.getContext());
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 9286b94..56d80ae 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -59,7 +59,7 @@ public class JavaInstanceRunnableTest {
     private JavaInstanceRunnable createRunnable(String outputSerde) throws 
Exception {
         InstanceConfig config = createInstanceConfig(outputSerde);
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                config, null, null, null, null, null, null, null, null);
+                config, null, null, null, null, null, null);
         return javaInstanceRunnable;
     }
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 56fb701..0f034d0 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.common.util.Reflections;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 
 import java.util.Optional;
@@ -37,6 +38,7 @@ public interface RuntimeFactory extends AutoCloseable {
     void initialize(WorkerConfig workerConfig,
                     AuthenticationConfig authenticationConfig,
                     SecretsProviderConfigurator secretsProviderConfigurator,
+                    ConnectorsManager connectorsManager,
                     Optional<FunctionAuthProvider> authProvider,
                     Optional<RuntimeCustomizer> runtimeCustomizer) throws 
Exception;
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index f001af4..b332722 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 
 import java.lang.reflect.Field;
@@ -129,6 +130,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
     @Override
     public void initialize(WorkerConfig workerConfig, AuthenticationConfig 
authenticationConfig,
                            SecretsProviderConfigurator 
secretsProviderConfigurator,
+                           ConnectorsManager connectorsManager,
                            Optional<FunctionAuthProvider> functionAuthProvider,
                            Optional<RuntimeCustomizer> runtimeCustomizer) {
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
index 04b871c..aaa0004 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 
 import java.nio.file.Paths;
@@ -94,6 +95,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
     @Override
     public void initialize(WorkerConfig workerConfig, AuthenticationConfig 
authenticationConfig,
                            SecretsProviderConfigurator 
secretsProviderConfigurator,
+                           ConnectorsManager connectorsManager,
                            Optional<FunctionAuthProvider> authProvider,
                            Optional<RuntimeCustomizer> runtimeCustomizer) {
         ProcessRuntimeFactoryConfig factoryConfig = 
RuntimeUtils.getRuntimeFunctionConfig(
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index f8b0a34..a9614a1 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -19,7 +19,12 @@
 
 package org.apache.pulsar.functions.runtime.thread;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import io.prometheus.client.CollectorRegistry;
@@ -28,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -36,6 +42,8 @@ import 
org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
+import org.apache.pulsar.functions.utils.io.Connector;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
 
 /**
  * A function container implemented using java thread.
@@ -60,6 +68,8 @@ public class ThreadRuntime implements Runtime {
     private SecretsProvider secretsProvider;
     private CollectorRegistry collectorRegistry;
     private String narExtractionDirectory;
+    private final Optional<ConnectorsManager> connectorsManager;
+
     ThreadRuntime(InstanceConfig instanceConfig,
                   FunctionCacheManager fnCache,
                   ThreadGroup threadGroup,
@@ -69,7 +79,8 @@ public class ThreadRuntime implements Runtime {
                   String stateStorageServiceUrl,
                   SecretsProvider secretsProvider,
                   CollectorRegistry collectorRegistry,
-                  String narExtractionDirectory) {
+                  String narExtractionDirectory,
+                  Optional<ConnectorsManager> connectorsManager) {
         this.instanceConfig = instanceConfig;
         if (instanceConfig.getFunctionDetails().getRuntime() != 
Function.FunctionDetails.Runtime.JAVA) {
             throw new RuntimeException("Thread Container only supports Java 
Runtime");
@@ -84,34 +95,82 @@ public class ThreadRuntime implements Runtime {
         this.secretsProvider = secretsProvider;
         this.collectorRegistry = collectorRegistry;
         this.narExtractionDirectory = narExtractionDirectory;
-        this.javaInstanceRunnable = new JavaInstanceRunnable(
-                instanceConfig,
-                fnCache,
-                jarFile,
-                pulsarClient,
-                pulsarAdmin,
-                stateStorageServiceUrl,
-                secretsProvider,
-                collectorRegistry,
-                narExtractionDirectory);
+        this.connectorsManager = connectorsManager;
+    }
+
+    private static ClassLoader getFunctionClassLoader(InstanceConfig 
instanceConfig,
+                                                      String jarFile,
+                                                      String 
narExtractionDirectory,
+                                                      FunctionCacheManager 
fnCache,
+                                                      
Optional<ConnectorsManager> connectorsManager) throws Exception {
+
+        if 
(FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails())
+                && connectorsManager.isPresent()) {
+            switch 
(InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails())) {
+                case SOURCE:
+                    return connectorsManager.get().getConnector(
+                            
instanceConfig.getFunctionDetails().getSource().getBuiltin()).getClassLoader();
+                case SINK:
+                    return connectorsManager.get().getConnector(
+                            
instanceConfig.getFunctionDetails().getSink().getBuiltin()).getClassLoader();
+                default:
+                    return loadJars(jarFile, instanceConfig, 
narExtractionDirectory, fnCache);
+            }
+        } else {
+            return loadJars(jarFile, instanceConfig, narExtractionDirectory, 
fnCache);
+        }
+    }
+
+    private static ClassLoader loadJars(String jarFile,
+                                 InstanceConfig instanceConfig,
+                                 String narExtractionDirectory,
+                                 FunctionCacheManager fnCache) throws 
Exception {
+        ClassLoader fnClassLoader;
+        try {
+            log.info("Load JAR: {}", jarFile);
+            // Let's first try to treat it as a nar archive
+            fnCache.registerFunctionInstanceWithArchive(
+                    instanceConfig.getFunctionId(),
+                    instanceConfig.getInstanceName(),
+                    jarFile, narExtractionDirectory);
+        } catch (FileNotFoundException e) {
+            // create the function class loader
+            fnCache.registerFunctionInstance(
+                    instanceConfig.getFunctionId(),
+                    instanceConfig.getInstanceName(),
+                    Arrays.asList(jarFile),
+                    Collections.emptyList());
+        }
+
+        log.info("Initialize function class loader for function {} at function 
cache manager, functionClassLoader: {}",
+                instanceConfig.getFunctionDetails().getName(), 
fnCache.getClassLoader(instanceConfig.getFunctionId()));
+
+        fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
+        if (null == fnClassLoader) {
+            throw new Exception("No function class loader available.");
+        }
+
+        return fnClassLoader;
     }
 
     /**
      * The core logic that initialize the thread container and executes the 
function.
      */
     @Override
-    public void start() {
+    public void start() throws Exception {
+
+        // extract class loader for function
+        ClassLoader functionClassLoader = 
getFunctionClassLoader(instanceConfig, jarFile, narExtractionDirectory, 
fnCache, connectorsManager);
+
         // re-initialize JavaInstanceRunnable so that variables in constructor 
can be re-initialized
         this.javaInstanceRunnable = new JavaInstanceRunnable(
                 instanceConfig,
-                fnCache,
-                jarFile,
                 pulsarClient,
                 pulsarAdmin,
                 stateStorageServiceUrl,
                 secretsProvider,
                 collectorRegistry,
-                narExtractionDirectory);
+                functionClassLoader);
         log.info("ThreadContainer starting function with instance config {}", 
instanceConfig);
         this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
                 String.format("%s-%s",
@@ -145,6 +204,12 @@ public class ThreadRuntime implements Runtime {
             }
             // make sure JavaInstanceRunnable is closed
             this.javaInstanceRunnable.close();
+
+            log.info("Unloading JAR files for function {}", instanceConfig);
+            // once the thread quits, clean up the instance
+            fnCache.unregisterFunctionInstance(
+                    instanceConfig.getFunctionId(),
+                    instanceConfig.getInstanceName());
         }
     }
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index f5e3736..d44cf1b 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -41,6 +41,7 @@ import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderCo
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import 
org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 
 import java.util.Optional;
@@ -64,6 +65,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     private volatile boolean closed;
     private SecretsProviderConfigurator secretsProviderConfigurator;
     private ClassLoader rootClassLoader;
+    private Optional<ConnectorsManager> connectorsManager;
 
     /**
      * This constructor is used by other runtimes (e.g. ProcessRuntime and 
KubernetesRuntime) that rely on ThreadRuntime to actually run an instance of 
the function.
@@ -76,13 +78,15 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
                                 String pulsarWebServiceUrl) throws Exception {
         initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, 
authConfig,
                 storageServiceUrl, null, secretsProvider, collectorRegistry, 
narExtractionDirectory,
-                rootClassLoader, exposePulsarAdminClientEnabled, 
pulsarWebServiceUrl);
+                rootClassLoader, exposePulsarAdminClientEnabled, 
pulsarWebServiceUrl, Optional.empty());
     }
 
     private void initialize(String threadGroupName, 
Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit, String 
pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl,
                             SecretsProviderConfigurator 
secretsProviderConfigurator, SecretsProvider secretsProvider,
                             CollectorRegistry collectorRegistry, String 
narExtractionDirectory,
-                            ClassLoader rootClassLoader, boolean 
exposePulsarAdminClientEnabled, String pulsarWebServiceUrl) throws 
PulsarClientException {
+                            ClassLoader rootClassLoader, boolean 
exposePulsarAdminClientEnabled,
+                            String pulsarWebServiceUrl, 
Optional<ConnectorsManager> connectorsManager) throws PulsarClientException {
+
         if (rootClassLoader == null) {
             rootClassLoader = Thread.currentThread().getContextClassLoader();
         }
@@ -97,6 +101,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
         this.storageServiceUrl = storageServiceUrl;
         this.collectorRegistry = collectorRegistry;
         this.narExtractionDirectory = narExtractionDirectory;
+        this.connectorsManager = connectorsManager;
     }
 
     private Optional<Long> 
calculateClientMemoryLimit(Optional<ThreadRuntimeFactoryConfig.MemoryLimit> 
memoryLimit) {
@@ -134,6 +139,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
     @Override
     public void initialize(WorkerConfig workerConfig, AuthenticationConfig 
authenticationConfig,
                            SecretsProviderConfigurator 
secretsProviderConfigurator,
+                           ConnectorsManager connectorsManager,
                            Optional<FunctionAuthProvider> functionAuthProvider,
                            Optional<RuntimeCustomizer> runtimeCustomizer) 
throws Exception {
         ThreadRuntimeFactoryConfig factoryConfig = 
RuntimeUtils.getRuntimeFunctionConfig(
@@ -143,7 +149,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
                 workerConfig.getPulsarServiceUrl(), authenticationConfig,
                 workerConfig.getStateStorageServiceUrl(), 
secretsProviderConfigurator, null,
                 null, workerConfig.getNarExtractionDirectory(), null,
-                workerConfig.isExposeAdminClientEnabled(), 
workerConfig.getPulsarWebServiceUrl());
+                workerConfig.isExposeAdminClientEnabled(), 
workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager));
     }
 
     @Override
@@ -169,7 +175,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
             storageServiceUrl,
             secretsProvider,
             collectorRegistry,
-            narExtractionDirectory);
+            narExtractionDirectory,
+            connectorsManager);
     }
 
     @Override
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
similarity index 96%
rename from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
rename to 
pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index 856c8d6..0afffa5 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.io.ConfigFieldDefinition;
 import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -33,6 +34,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ConnectorsManager {
 
+    @Getter
     private volatile TreeMap<String, Connector> connectors;
 
     public ConnectorsManager(WorkerConfig workerConfig) throws IOException {
@@ -47,7 +49,7 @@ public class ConnectorsManager {
         return connectors.get(connectorType).getConnectorDefinition();
     }
 
-    public List<ConnectorDefinition> getConnectors() {
+    public List<ConnectorDefinition> getConnectorDefinitions() {
         return connectors.values().stream().map(connector -> 
connector.getConnectorDefinition()).collect(Collectors.toList());
     }
 
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index dec1014..9870c3f 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
@@ -181,7 +182,7 @@ public class KubernetesRuntimeFactoryTest {
         workerConfig.setStateStorageServiceUrl(null);
         workerConfig.setAuthenticationEnabled(false);
 
-        factory.initialize(workerConfig,null, new 
TestSecretProviderConfigurator(), functionAuthProvider, manifestCustomizer);
+        factory.initialize(workerConfig,null, new 
TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class), 
functionAuthProvider, manifestCustomizer);
         return factory;
     }
 
@@ -383,7 +384,7 @@ public class KubernetesRuntimeFactoryTest {
         workerConfig.setFunctionRuntimeFactoryConfigs(
                 
ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig,
 Map.class));
         AuthenticationConfig authenticationConfig = 
AuthenticationConfig.builder().build();
-        kubernetesRuntimeFactory.initialize(workerConfig, 
authenticationConfig, new DefaultSecretsProviderConfigurator(), 
Optional.empty(), Optional.empty());
+        kubernetesRuntimeFactory.initialize(workerConfig, 
authenticationConfig, new DefaultSecretsProviderConfigurator(), 
Mockito.mock(ConnectorsManager.class), Optional.empty(), Optional.empty());
         return kubernetesRuntimeFactory;
     }
 }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 5c1c901..54c72cb 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -39,7 +39,9 @@ import 
org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -219,7 +221,7 @@ public class KubernetesRuntimeTest {
 
         manifestCustomizer.ifPresent(runtimeCustomizer -> 
runtimeCustomizer.initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap())));
 
-        factory.initialize(workerConfig, null, new 
TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
+        factory.initialize(workerConfig, null, new 
TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class), 
Optional.empty(), manifestCustomizer);
         return factory;
     }
 
@@ -910,7 +912,8 @@ public class KubernetesRuntimeTest {
             
manifestCustomizer.get().initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
         }
 
-        factory.initialize(workerConfig, null, new 
TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
+        factory.initialize(workerConfig, null, new 
TestSecretProviderConfigurator(),
+                Mockito.mock(ConnectorsManager.class), Optional.empty(), 
manifestCustomizer);
         return factory;
     }
 
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 66090f5..69336fa 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -49,7 +49,9 @@ import 
org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -170,7 +172,7 @@ public class ProcessRuntimeTest {
         
workerConfig.setFunctionRuntimeFactoryClassName(ProcessRuntimeFactory.class.getName());
         workerConfig.setFunctionRuntimeFactoryConfigs(
                 
ObjectMapperFactory.getThreadLocal().convertValue(processRuntimeFactoryConfig, 
Map.class));
-        processRuntimeFactory.initialize(workerConfig, null, new 
TestSecretsProviderConfigurator(), Optional.empty(), Optional.empty());
+        processRuntimeFactory.initialize(workerConfig, null, new 
TestSecretsProviderConfigurator(), Mockito.mock(ConnectorsManager.class), 
Optional.empty(), Optional.empty());
 
         return processRuntimeFactory;
     }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
index d0e2596..8346e9d 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
@@ -145,6 +146,7 @@ public class ThreadRuntimeFactoryTest {
                 workerConfig,
                 Mockito.mock(AuthenticationConfig.class),
                 Mockito.mock(SecretsProviderConfigurator.class),
+                Mockito.mock(ConnectorsManager.class),
                 Optional.empty(), Optional.empty());
 
         return clientBuilder;
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 41dbcbd..7858663 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -483,4 +483,26 @@ public class FunctionCommon {
     public static String capFirstLetter(Enum en) {
         return StringUtils.capitalize(en.toString().toLowerCase());
     }
+
+    public static boolean 
isFunctionCodeBuiltin(org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder
 functionDetails) {
+        if (functionDetails.hasSource()) {
+            org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = 
functionDetails.getSource();
+            if (!isEmpty(sourceSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            org.apache.pulsar.functions.proto.Function.SinkSpec sinkSpec = 
functionDetails.getSink();
+            if (!isEmpty(sinkSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        if (!isEmpty(functionDetails.getBuiltin())) {
+            return true;
+        }
+
+        return false;
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index e051f12..73f8f95 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -48,7 +47,6 @@ import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.io.Connector;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -120,7 +118,7 @@ public class FunctionActioner {
                     URL url = new URL(pkgLocation);
                     File pkgFile = new File(url.toURI());
                     packageFile = pkgFile.getAbsolutePath();
-                } else if (WorkerUtils.isFunctionCodeBuiltin(functionDetails)) 
{
+                } else if 
(FunctionCommon.isFunctionCodeBuiltin(functionDetails)) {
                     File pkgFile = 
getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
                     packageFile = pkgFile.getAbsolutePath();
                 } else {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 67b72be..c95475a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -206,7 +206,9 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             }
         }
         // initialize runtime
-        this.runtimeFactory.initialize(workerConfig, authConfig, 
secretsProviderConfigurator, functionAuthProvider, runtimeCustomizer);
+        this.runtimeFactory.initialize(workerConfig, authConfig,
+                secretsProviderConfigurator, connectorsManager,
+                functionAuthProvider, runtimeCustomizer);
 
         this.functionActioner = new FunctionActioner(this.workerConfig, 
runtimeFactory,
                 dlogNamespace, connectorsManager, functionsManager, 
workerService.getBrokerAdmin());
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 5e65990..821a8fc 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -314,28 +314,6 @@ public final class WorkerUtils {
         }
     }
 
-    public static boolean 
isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder functionDetails) {
-        if (functionDetails.hasSource()) {
-            Function.SourceSpec sourceSpec = functionDetails.getSource();
-            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
-                return true;
-            }
-        }
-
-        if (functionDetails.hasSink()) {
-            Function.SinkSpec sinkSpec = functionDetails.getSink();
-            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
-                return true;
-            }
-        }
-
-        if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
-            return true;
-        }
-
-        return false;
-    }
-
     public static Reader<byte[]> createReader(ReaderBuilder readerBuilder,
                                               String readerName,
                                               String topic,
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 62747bf..8699ecf 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -26,7 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
 import static 
org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
-import static 
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static 
org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
 import static 
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 import io.netty.buffer.ByteBuf;
@@ -59,10 +59,8 @@ import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -78,7 +76,6 @@ import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -101,7 +98,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
@@ -900,7 +896,7 @@ public abstract class ComponentImpl implements 
Component<PulsarWorkerService> {
             throwUnavailableException();
         }
 
-        return this.worker().getConnectorsManager().getConnectors();
+        return this.worker().getConnectorsManager().getConnectorDefinitions();
     }
 
     @Override
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 72253cb..b933c0d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -64,7 +64,7 @@ import java.util.function.Supplier;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static 
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static 
org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
 import static 
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index a7909d7..c059e18 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -53,14 +53,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.*;
 import java.util.function.Supplier;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static 
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static 
org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
 import static 
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 15b5c9e..089b67c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -59,7 +59,7 @@ import java.util.function.Supplier;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static 
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static 
org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
 import static 
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index a67d179..7d24d8c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -208,7 +208,7 @@ public class WorkerImpl implements 
Workers<PulsarWorkerService> {
             throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
         }
 
-        return this.worker().getConnectorsManager().getConnectors();
+        return this.worker().getConnectorsManager().getConnectorDefinitions();
     }
 
     public void rebalance(final URI uri, final String clientRole) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index facae5a..4214364 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -699,6 +699,7 @@ public class FunctionRuntimeManagerTest {
             any(AuthenticationConfig.class),
             any(SecretsProviderConfigurator.class),
             any(),
+            any(),
             any()
         );
         doNothing().when(kubernetesRuntimeFactory).setupClient();
@@ -944,6 +945,7 @@ public class FunctionRuntimeManagerTest {
             any(AuthenticationConfig.class),
             any(SecretsProviderConfigurator.class),
             any(),
+            any(),
             any()
         );
         doNothing().when(mockedKubernetesRuntimeFactory).setupClient();
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 8e026d9..41755f7 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -163,7 +163,7 @@ public class FunctionsImplTest {
         instanceConfig.setMaxBufferedTuples(1024);
 
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                instanceConfig, null, null, null, null, null, null, null, 
null);
+                instanceConfig, null, null, null, null, null, null);
         CompletableFuture<InstanceCommunication.MetricsData> 
metricsDataCompletableFuture = new 
CompletableFuture<InstanceCommunication.MetricsData>();
         
metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
         Runtime runtime = mock(Runtime.class);
@@ -209,7 +209,7 @@ public class FunctionsImplTest {
         instanceConfig.setMaxBufferedTuples(1024);
 
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                instanceConfig, null, null, null, null, null, null, null, 
null);
+                instanceConfig, null, null, null, null, null, null);
         CompletableFuture<InstanceCommunication.MetricsData> completableFuture 
= new CompletableFuture<InstanceCommunication.MetricsData>();
         completableFuture.complete(javaInstanceRunnable.getMetrics());
         Runtime runtime = mock(Runtime.class);

Reply via email to