This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7a76954 Modify ThreadRuntime to allow custom secrets providers (#6971) 7a76954 is described below commit 7a76954abf2acb8eec9a40dc932b4c95896296f3 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Sun May 17 22:22:03 2020 -0700 Modify ThreadRuntime to allow custom secrets providers (#6971) Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../runtime/thread/ThreadRuntimeFactory.java | 23 +++++++++++++++++----- .../functions/worker/FunctionRuntimeManager.java | 2 ++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java index c675418..cb82129 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java @@ -37,6 +37,7 @@ import org.apache.pulsar.functions.runtime.RuntimeUtils; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl; import org.apache.pulsar.functions.worker.WorkerConfig; @@ -61,13 +62,15 @@ public class ThreadRuntimeFactory implements RuntimeFactory { private CollectorRegistry collectorRegistry; private String narExtractionDirectory; private volatile boolean closed; + private SecretsProviderConfigurator secretsProviderConfigurator; + private ClassLoader rootClassLoader; public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl, AuthenticationConfig authConfig, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader) throws Exception { initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), - storageServiceUrl, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader); + storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader); } @VisibleForTesting @@ -76,7 +79,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { String narExtractionDirectory, ClassLoader rootClassLoader) { initialize(threadGroupName, pulsarClient, storageServiceUrl, - secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader); + null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader); } private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) @@ -101,12 +104,14 @@ public class ThreadRuntimeFactory implements RuntimeFactory { } private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl, - SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, - String narExtractionDirectory, ClassLoader rootClassLoader) { + SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider, + CollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader) { if (rootClassLoader == null) { rootClassLoader = Thread.currentThread().getContextClassLoader(); } + this.rootClassLoader = rootClassLoader; + this.secretsProviderConfigurator = secretsProviderConfigurator; this.secretsProvider = secretsProvider; this.fnCache = new FunctionCacheManagerImpl(rootClassLoader); this.threadGroup = new ThreadGroup(threadGroupName); @@ -126,7 +131,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { initialize(factoryConfig.getThreadGroupName(), createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig), - workerConfig.getStateStorageServiceUrl(), new ClearTextSecretsProvider(), + workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null, null, workerConfig.getNarExtractionDirectory(), null); } @@ -134,6 +139,14 @@ public class ThreadRuntimeFactory implements RuntimeFactory { public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile, String originalCodeFileName, Long expectedHealthCheckInterval) { + if (secretsProvider == null) { + String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails()); + secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, this.rootClassLoader); + log.info("Initializing secrets provider {} with configs: {}", + secretsProvider.getClass().getName(), secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails())); + secretsProvider.init(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails())); + } + return new ThreadRuntime( instanceConfig, fnCache, diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index c66be68..3f522dd 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -143,6 +143,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ } else { secretsProviderConfigurator = new DefaultSecretsProviderConfigurator(); } + log.info("Initializing secrets provider configurator {} with configs: {}", + secretsProviderConfigurator.getClass().getName(), workerConfig.getSecretsProviderConfiguratorConfig()); secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig()); Optional<FunctionAuthProvider> functionAuthProvider = Optional.empty();