This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a76954  Modify ThreadRuntime to allow custom secrets providers (#6971)
7a76954 is described below

commit 7a76954abf2acb8eec9a40dc932b4c95896296f3
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Sun May 17 22:22:03 2020 -0700

    Modify ThreadRuntime to allow custom secrets providers (#6971)
    
    Co-authored-by: Jerry Peng <jer...@splunk.com>
---
 .../runtime/thread/ThreadRuntimeFactory.java       | 23 +++++++++++++++++-----
 .../functions/worker/FunctionRuntimeManager.java   |  2 ++
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index c675418..cb82129 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import 
org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -61,13 +62,15 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
     private CollectorRegistry collectorRegistry;
     private String narExtractionDirectory;
     private volatile boolean closed;
+    private SecretsProviderConfigurator secretsProviderConfigurator;
+    private ClassLoader rootClassLoader;
 
     public ThreadRuntimeFactory(String threadGroupName, String 
pulsarServiceUrl, String storageServiceUrl,
                                 AuthenticationConfig authConfig, 
SecretsProvider secretsProvider,
                                 CollectorRegistry collectorRegistry, String 
narExtractionDirectory,
                                 ClassLoader rootClassLoader) throws Exception {
         initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, 
authConfig),
-                storageServiceUrl, secretsProvider, collectorRegistry, 
narExtractionDirectory, rootClassLoader);
+                storageServiceUrl, null, secretsProvider, collectorRegistry, 
narExtractionDirectory, rootClassLoader);
     }
 
     @VisibleForTesting
@@ -76,7 +79,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
                                 String narExtractionDirectory, ClassLoader 
rootClassLoader) {
 
         initialize(threadGroupName, pulsarClient, storageServiceUrl,
-                secretsProvider, collectorRegistry, narExtractionDirectory, 
rootClassLoader);
+                null, secretsProvider, collectorRegistry, 
narExtractionDirectory, rootClassLoader);
     }
 
     private static PulsarClient createPulsarClient(String pulsarServiceUrl, 
AuthenticationConfig authConfig)
@@ -101,12 +104,14 @@ public class ThreadRuntimeFactory implements 
RuntimeFactory {
     }
 
     private void initialize(String threadGroupName, PulsarClient pulsarClient, 
String storageServiceUrl,
-                            SecretsProvider secretsProvider, CollectorRegistry 
collectorRegistry,
-                            String narExtractionDirectory, ClassLoader 
rootClassLoader) {
+                            SecretsProviderConfigurator 
secretsProviderConfigurator, SecretsProvider secretsProvider,
+                            CollectorRegistry collectorRegistry,  String 
narExtractionDirectory, ClassLoader rootClassLoader) {
         if (rootClassLoader == null) {
             rootClassLoader = Thread.currentThread().getContextClassLoader();
         }
 
+        this.rootClassLoader = rootClassLoader;
+        this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.secretsProvider = secretsProvider;
         this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
         this.threadGroup = new ThreadGroup(threadGroupName);
@@ -126,7 +131,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
 
         initialize(factoryConfig.getThreadGroupName(),
                 createPulsarClient(workerConfig.getPulsarServiceUrl(), 
authenticationConfig),
-                workerConfig.getStateStorageServiceUrl(), new 
ClearTextSecretsProvider(),
+                workerConfig.getStateStorageServiceUrl(), 
secretsProviderConfigurator, null,
                 null, workerConfig.getNarExtractionDirectory(), null);
     }
 
@@ -134,6 +139,14 @@ public class ThreadRuntimeFactory implements 
RuntimeFactory {
     public ThreadRuntime createContainer(InstanceConfig instanceConfig, String 
jarFile,
                                          String originalCodeFileName,
                                          Long expectedHealthCheckInterval) {
+        if (secretsProvider == null) {
+            String secretsProviderClassName = 
secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
+            secretsProvider = (SecretsProvider) 
Reflections.createInstance(secretsProviderClassName, this.rootClassLoader);
+            log.info("Initializing secrets provider {} with configs: {}",
+              secretsProvider.getClass().getName(), 
secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+            
secretsProvider.init(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+        }
+
         return new ThreadRuntime(
             instanceConfig,
             fnCache,
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index c66be68..3f522dd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -143,6 +143,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         } else {
             secretsProviderConfigurator = new 
DefaultSecretsProviderConfigurator();
         }
+        log.info("Initializing secrets provider configurator {} with configs: 
{}",
+          secretsProviderConfigurator.getClass().getName(), 
workerConfig.getSecretsProviderConfiguratorConfig());
         
secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
 
         Optional<FunctionAuthProvider> functionAuthProvider = Optional.empty();

Reply via email to