This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 649c6ca Hooked up secrets function api with secret function implementations (#2875) 649c6ca is described below commit 649c6ca4eb83d340edfd4b431e0e4e68099d95f6 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Tue Oct 30 17:35:44 2018 -0700 Hooked up secrets function api with secret function implementations (#2875) * Hooked up secrets function api with secret function implementations * Fixed unittest * Added more docs * Took feedback into account --- .../python/pulsar/functions/context.py | 5 ++ .../org/apache/pulsar/admin/cli/CmdFunctions.java | 3 +- .../org/apache/pulsar/functions/api/Context.java | 7 +++ pulsar-functions/instance/pom.xml | 6 +++ .../pulsar/functions/instance/ContextImpl.java | 24 ++++++++- .../functions/instance/JavaInstanceRunnable.java | 9 +++- .../instance/src/main/python/contextimpl.py | 11 +++- .../instance/src/main/python/python_instance.py | 5 +- .../src/main/python/python_instance_main.py | 19 ++++++- .../instance/src/main/python/secretsprovider.py | 61 +++++++++++++++++++++ .../pulsar/functions/instance/ContextImplTest.java | 4 +- .../instance/JavaInstanceRunnableTest.java | 2 +- .../src/test/python/test_python_instance.py | 2 +- ..._python_instance.py => test_secretsprovider.py} | 48 ++++++++--------- pulsar-functions/runtime/pom.xml | 6 +++ .../pulsar/functions/runtime/JavaInstanceMain.java | 35 +++++++++++- .../functions/runtime/KubernetesRuntime.java | 24 ++++++++- .../runtime/KubernetesRuntimeFactory.java | 19 ++++++- .../pulsar/functions/runtime/ProcessRuntime.java | 15 +++++- .../functions/runtime/ProcessRuntimeFactory.java | 7 ++- .../pulsar/functions/runtime/RuntimeUtils.java | 10 ++++ .../pulsar/functions/runtime/ThreadRuntime.java | 7 ++- .../functions/runtime/ThreadRuntimeFactory.java | 14 +++-- .../functions/runtime/KubernetesRuntimeTest.java | 15 +++--- .../functions/runtime/ProcessRuntimeTest.java | 63 ++++++++++++++++++++-- .../functions/worker/FunctionRuntimeManager.java | 21 ++++++-- .../pulsar/functions/worker/WorkerConfig.java | 8 ++- .../functions/worker/SchedulerManagerTest.java | 21 ++++---- 28 files changed, 394 insertions(+), 77 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py index 47e86f9..6575f7a 100644 --- a/pulsar-client-cpp/python/pulsar/functions/context.py +++ b/pulsar-client-cpp/python/pulsar/functions/context.py @@ -99,6 +99,11 @@ class Context(object): pass @abstractmethod + def get_secret(self, secret_name): + """Returns the secret value associated with the name. None if nothing was found""" + pass + + @abstractmethod def record_metric(self, metric_name, metric_value): """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)""" pass diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index a75092c..3169e22 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -73,6 +73,7 @@ import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; import org.apache.pulsar.functions.utils.*; import org.apache.pulsar.functions.windowing.WindowUtils; @@ -1042,7 +1043,7 @@ public class CmdFunctions extends CmdBase { } try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null, - null)) { + null, new DefaultSecretsProviderConfigurator())) { List<RuntimeSpawner> spawners = new LinkedList<>(); for (int i = 0; i < parallelism; ++i) { InstanceConfig instanceConfig = new InstanceConfig(); diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index c66ea6e..e9b79c3 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -160,6 +160,13 @@ public interface Context { Object getUserConfigValueOrDefault(String key, Object defaultValue); /** + * Get the secret associated with this key + * @param secretName The name of the secret + * @return The secret if anything was found or null + */ + String getSecret(String secretName); + + /** * Record a user defined metric * @param metricName The name of the metric * @param value The value of the metric diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index 1c8f3ab..8dec8dc 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -65,6 +65,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-functions-secrets</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>pulsar-client-original</artifactId> diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 4d47433..406fe13 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -49,6 +49,7 @@ import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; +import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; @@ -101,12 +102,16 @@ class ContextImpl implements Context, SinkContext, SourceContext { private final TopicSchema topicSchema; + private final SecretsProvider secretsProvider; + private final Map<String, Object> secretsMap; + @Getter @Setter private StateContextImpl stateContext; private Map<String, Object> userConfigs; - public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics) { + public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics, + SecretsProvider secretsProvider) { this.config = config; this.logger = logger; this.currentAccumulatedMetrics = new ConcurrentHashMap<>(); @@ -125,6 +130,14 @@ class ContextImpl implements Context, SinkContext, SourceContext { new TypeToken<Map<String, Object>>() { }.getType()); } + this.secretsProvider = secretsProvider; + if (!StringUtils.isEmpty(config.getFunctionDetails().getSecretsMap())) { + secretsMap = new Gson().fromJson(config.getFunctionDetails().getSecretsMap(), + new TypeToken<Map<String, Object>>() { + }.getType()); + } else { + secretsMap = new HashMap<>(); + } } public void setCurrentMessageContext(Record<?> record) { @@ -212,6 +225,15 @@ class ContextImpl implements Context, SinkContext, SourceContext { return userConfigs; } + @Override + public String getSecret(String secretName) { + if (secretsMap.containsKey(secretName)) { + return secretsProvider.provideSecret(secretName, secretsMap.get(secretName)); + } else { + return null; + } + } + private void ensureStateEnabled() { checkState(null != stateContext, "State is not enabled."); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index ace5efc..f626c10 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -49,6 +49,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder; +import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.sink.PulsarSinkConfig; import org.apache.pulsar.functions.sink.PulsarSinkDisable; @@ -110,6 +111,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Source source; private Sink sink; + private final SecretsProvider secretsProvider; + public static final String METRICS_TOTAL_PROCESSED = "__total_processed__"; public static final String METRICS_TOTAL_SUCCESS = "__total_successfully_processed__"; public static final String METRICS_TOTAL_SYS_EXCEPTION = "__total_system_exceptions__"; @@ -122,13 +125,15 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { FunctionCacheManager fnCache, String jarFile, PulsarClient pulsarClient, - String stateStorageServiceUrl) { + String stateStorageServiceUrl, + SecretsProvider secretsProvider) { this.instanceConfig = instanceConfig; this.fnCache = fnCache; this.jarFile = jarFile; this.client = (PulsarClientImpl) pulsarClient; this.stateStorageServiceUrl = stateStorageServiceUrl; this.stats = new FunctionStats(); + this.secretsProvider = secretsProvider; } /** @@ -173,7 +178,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } Logger instanceLog = LoggerFactory.getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); - return new ContextImpl(instanceConfig, instanceLog, client, inputTopics); + return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider); } /** diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 24246ca..83a63aa 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -48,12 +48,13 @@ class AccumulatedMetricDatum(object): self.min = value class ContextImpl(pulsar.Context): - def __init__(self, instance_config, logger, pulsar_client, user_code, consumers): + def __init__(self, instance_config, logger, pulsar_client, user_code, consumers, secrets_provider): self.instance_config = instance_config self.log = logger self.pulsar_client = pulsar_client self.user_code_dir = os.path.dirname(user_code) self.consumers = consumers + self.secrets_provider = secrets_provider self.current_accumulated_metrics = {} self.accumulated_metrics = {} self.publish_producers = {} @@ -64,6 +65,9 @@ class ContextImpl(pulsar.Context): self.user_config = json.loads(instance_config.function_details.userConfig) \ if instance_config.function_details.userConfig \ else [] + self.secrets_map = json.loads(instance_config.function_details.secretsMap) \ + if instance_config.function_details.secretsMap \ + else {} # Called on a per message basis to set the context for the current message def set_current_message_context(self, msgid, topic): @@ -107,6 +111,11 @@ class ContextImpl(pulsar.Context): def get_user_config_map(self): return self.user_config + def get_secret(self, secret_key): + if not secret_key in self.secrets_map: + return None + return self.secrets_provider.provide_secret(secret_key, self.secrets_map[secret_key]) + def record_metric(self, metric_name, metric_value): if not metric_name in self.current_accumulated_metrics: self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum() diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 4b9ae3a..2438338 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -132,7 +132,7 @@ class Stats(object): class PythonInstance(object): - def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client): + def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client, secrets_provider): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) self.user_code = user_code self.queue = queue.Queue(max_buffered_tuples) @@ -157,6 +157,7 @@ class PythonInstance(object): self.last_health_check_ts = time.time() self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None self.expected_healthcheck_interval = expected_healthcheck_interval + self.secrets_provider = secrets_provider def health_check(self): self.last_health_check_ts = time.time() @@ -226,7 +227,7 @@ class PythonInstance(object): except: self.function_purefunction = function_kclass - self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers) + self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers, self.secrets_provider) # Now launch a thread that does execution self.exeuction_thread = threading.Thread(target=self.actual_execution) self.exeuction_thread.start() diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 2acfd6e..748923e 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -29,6 +29,8 @@ import sys import signal import time import zipfile +import json +import inspect import pulsar @@ -72,11 +74,12 @@ def main(): parser.add_argument('--logging_file', required=True, help='Log file name') parser.add_argument('--logging_config_file', required=True, help='Config file for logging') parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int) + parser.add_argument('--secrets_provider', required=False, help='The classname of the secrets provider') + parser.add_argument('--secrets_provider_config', required=False, help='The config that needs to be passed to secrets provider') parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool) parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from') parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from') - args = parser.parse_args() function_details = Function_pb2.FunctionDetails() args.function_details = str(args.function_details) @@ -120,11 +123,23 @@ def main(): if args.tls_trust_cert_path: tls_trust_cert_path = args.tls_trust_cert_path pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection) + + secrets_provider = None + if args.secrets_provider is not None: + secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())), str(args.secrets_provider)) + else: + secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())), "secretsprovider.ClearTextSecretsProvider") + secrets_provider = secrets_provider() + secrets_provider_config = None + if args.secrets_provider_config is not None: + secrets_provider_config = json.loads(str(args.secrets_provider_config)) + secrets_provider.init(secrets_provider_config) + pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id), str(args.function_version), function_details, int(args.max_buffered_tuples), int(args.expected_healthcheck_interval), - str(args.py), pulsar_client) + str(args.py), pulsar_client, secrets_provider) pyinstance.run() server_instance = server.serve(args.port, pyinstance) diff --git a/pulsar-functions/instance/src/main/python/secretsprovider.py b/pulsar-functions/instance/src/main/python/secretsprovider.py new file mode 100644 index 0000000..db8e68c --- /dev/null +++ b/pulsar-functions/instance/src/main/python/secretsprovider.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# -*- encoding: utf-8 -*- + +"""secretsprovider.py: Interfaces and definitions for Secret Providers +""" +from abc import abstractmethod +import os + +class SecretsProvider: + """Interface for providing secrets information runtime""" + @abstractmethod + def init(self, config): + """Do any kind of initialization""" + pass + + @abstractmethod + def provide_secret(self, secret_name, path_to_secret): + """Fetches the secret located at the path""" + pass + + +"""A simple implementation that represents storing secrets in clear text """ +class ClearTextSecretsProvider(SecretsProvider): + def __init__(self): + pass + + def init(self, config): + pass + + def provide_secret(self, secret_name, path_to_secret): + return path_to_secret + +"""Implementation that fetches secrets from environment variables""" +class EnvironmentBasedSecretsProvider(SecretsProvider): + def __init__(self): + pass + + def init(self, config): + pass + + def provide_secret(self, secret_name, path_to_secret): + return os.environ.get(secret_name) diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index f5108fc..e3e32fd 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; @@ -73,7 +74,8 @@ public class ContextImplTest { config, logger, client, - new ArrayList<>() + new ArrayList<>(), + new EnvironmentBasedSecretsProvider() ); } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 12d4f19..80b3b1d 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -56,7 +56,7 @@ public class JavaInstanceRunnableTest { private JavaInstanceRunnable createRunnable(boolean addCustom, String outputSerde) throws Exception { InstanceConfig config = createInstanceConfig(addCustom, outputSerde); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - config, null, null, null, null); + config, null, null, null, null, null); return javaInstanceRunnable; } diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py index 0b5355f..748e5d8 100644 --- a/pulsar-functions/instance/src/test/python/test_python_instance.py +++ b/pulsar-functions/instance/src/test/python/test_python_instance.py @@ -48,7 +48,7 @@ class TestContextImpl(unittest.TestCase): pulsar_client.create_producer = Mock(return_value=producer) user_code=__file__ consumers = None - context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers) + context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers, None) context_impl.publish("test_topic_name", "test_message") diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_secretsprovider.py similarity index 50% copy from pulsar-functions/instance/src/test/python/test_python_instance.py copy to pulsar-functions/instance/src/test/python/test_secretsprovider.py index 0b5355f..5d725ee 100644 --- a/pulsar-functions/instance/src/test/python/test_python_instance.py +++ b/pulsar-functions/instance/src/test/python/test_secretsprovider.py @@ -20,11 +20,9 @@ # DEPENDENCIES: unittest2,mock -from contextimpl import ContextImpl -from python_instance import InstanceConfig -from mock import Mock +from secretsprovider import ClearTextSecretsProvider +from secretsprovider import EnvironmentBasedSecretsProvider -import Function_pb2 import log import os import unittest @@ -34,25 +32,23 @@ class TestContextImpl(unittest.TestCase): def setUp(self): log.init_logger("INFO", "foo", os.environ.get("PULSAR_HOME") + "/conf/functions-logging/console_logging_config.ini") - def test_context_publish(self): - instance_id = 'test_instance_id' - function_id = 'test_function_id' - function_version = 'test_function_version' - function_details = Function_pb2.FunctionDetails() - max_buffered_tuples = 100; - instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) - logger = log.Log - pulsar_client = Mock() - producer = Mock() - producer.send_async = Mock(return_value=None) - pulsar_client.create_producer = Mock(return_value=producer) - user_code=__file__ - consumers = None - context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers) - - context_impl.publish("test_topic_name", "test_message") - - producer.send_async.assert_called_with("test_message", None, properties=None) - - - + def test_cleartext_secretsprovider(self): + provider = ClearTextSecretsProvider() + secret = provider.provide_secret("secretName", "secretPath") + self.assertEqual(secret, "secretPath") + secret = provider.provide_secret("secretName", "") + self.assertEqual(secret, "") + secret = provider.provide_secret("secretName", None) + self.assertEqual(secret, None) + + def test_environment_secretsprovider(self): + provider = EnvironmentBasedSecretsProvider() + secret = provider.provide_secret("secretName", "secretPath") + self.assertEqual(secret, None) + os.environ["secretName"] = "secretValue" + secret = provider.provide_secret("secretName", "") + self.assertEqual(secret, "secretValue") + secret = provider.provide_secret("secretName", None) + self.assertEqual(secret, "secretValue") + secret = provider.provide_secret("secretName", "somethingelse") + self.assertEqual(secret, "secretValue") \ No newline at end of file diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml index 66d245f..e0b264d 100644 --- a/pulsar-functions/runtime/pom.xml +++ b/pulsar-functions/runtime/pom.xml @@ -46,6 +46,12 @@ </dependency> <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-functions-secrets</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-all</artifactId> </dependency> diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 1551f6f..80503bb 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -22,18 +22,26 @@ package org.apache.pulsar.functions.runtime; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.converters.StringConverter; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import com.google.protobuf.Empty; import com.google.protobuf.util.JsonFormat; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceControlGrpc; +import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; +import org.apache.pulsar.functions.secretsprovider.SecretsProvider; +import org.apache.pulsar.functions.utils.Reflections; +import java.lang.reflect.Type; +import java.util.Map; import java.util.TimerTask; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -95,6 +103,12 @@ public class JavaInstanceMain implements AutoCloseable { @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true) protected int expectedHealthCheckInterval; + @Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false) + protected String secretsProviderClassName; + + @Parameter(names = "--secrets_provider_config", description = "The config that needs to be passed to secrets provider", required = false) + protected String secretsProviderConfig; + private Server server; private RuntimeSpawner runtimeSpawner; private ThreadRuntimeFactory containerFactory; @@ -122,13 +136,32 @@ public class JavaInstanceMain implements AutoCloseable { instanceConfig.setFunctionDetails(functionDetails); instanceConfig.setPort(port); + Map<String, String> secretsProviderConfigMap = null; + if (!StringUtils.isEmpty(secretsProviderConfig)) { + Type type = new TypeToken<Map<String, String>>() {}.getType(); + secretsProviderConfigMap = new Gson().fromJson(secretsProviderConfig, type); + } + + if (StringUtils.isEmpty(secretsProviderClassName)) { + secretsProviderClassName = ClearTextSecretsProvider.class.getName(); + } + + SecretsProvider secretsProvider; + try { + secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader()); + } catch (Exception e) { + throw new RuntimeException(e); + } + secretsProvider.init(secretsProviderConfigMap); + containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl, stateStorageServiceUrl, AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin) .clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls)) .tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection)) .tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled)) - .tlsTrustCertsFilePath(tlsTrustCertFilePath).build()); + .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), + secretsProvider); runtimeSpawner = new RuntimeSpawner( instanceConfig, jarFile, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 7f3f72d..152842e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -22,6 +22,8 @@ package org.apache.pulsar.functions.runtime; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import com.google.protobuf.Empty; import com.google.protobuf.util.JsonFormat; import com.squareup.okhttp.Response; @@ -33,6 +35,7 @@ import io.kubernetes.client.custom.Quantity; import io.kubernetes.client.models.*; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.metrics.PrometheusMetricsServer; @@ -40,7 +43,10 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceControlGrpc; +import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; +import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import java.lang.reflect.Type; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -97,6 +103,7 @@ class KubernetesRuntime implements Runtime { private final String userCodePkgUrl; private final String originalCodeFileName; private final String pulsarAdminUrl; + private final SecretsProviderConfigurator secretsProviderConfigurator; private boolean running; @@ -119,6 +126,7 @@ class KubernetesRuntime implements Runtime { String pulsarAdminUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, + SecretsProviderConfigurator secretsProviderConfigurator, Integer expectedMetricsInterval) throws Exception { this.appsClient = appsClient; this.coreClient = coreClient; @@ -130,7 +138,13 @@ class KubernetesRuntime implements Runtime { this.userCodePkgUrl = userCodePkgUrl; this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName; this.pulsarAdminUrl = pulsarAdminUrl; + this.secretsProviderConfigurator = secretsProviderConfigurator; String logConfigFile = null; + String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails()); + String secretsProviderConfig = null; + if (secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()) != null) { + secretsProviderConfig = new Gson().toJson(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails())); + } switch (instanceConfig.getFunctionDetails().getRuntime()) { case JAVA: logConfigFile = "kubernetes_instance_log4j2.yml"; @@ -141,7 +155,7 @@ class KubernetesRuntime implements Runtime { } this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl, authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, logConfigFile, - installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository); + secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository); this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval); running = false; doChecks(instanceConfig.getFunctionDetails()); @@ -156,6 +170,10 @@ class KubernetesRuntime implements Runtime { try { submitStatefulSet(); } catch (Exception e) { + log.error("Could not submit statefulset for {}/{}/{}, deleting service as well", + instanceConfig.getFunctionDetails().getTenant(), + instanceConfig.getFunctionDetails().getNamespace(), + instanceConfig.getFunctionDetails().getName(), e); deleteService(); } running = true; @@ -536,8 +554,10 @@ class KubernetesRuntime implements Runtime { .valueFrom(new V1EnvVarSource() .fieldRef(new V1ObjectFieldSelector() .fieldPath("metadata.name"))); - container.setEnv(Arrays.asList(envVarPodName)); + container.addEnvItem(envVarPodName); + // Configure secrets + secretsProviderConfigurator.configureKubernetesRuntimeSecretsProvider(container, instanceConfig.getFunctionDetails()); // set container resources final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements(); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index c6d5d02..2adbb5e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -20,6 +20,8 @@ package org.apache.pulsar.functions.runtime; import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import io.kubernetes.client.ApiClient; import io.kubernetes.client.Configuration; import io.kubernetes.client.apis.AppsV1Api; @@ -30,11 +32,14 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import java.lang.reflect.Field; +import java.lang.reflect.Type; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -72,6 +77,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private final String javaInstanceJarFile; private final String pythonInstanceFile; private final String prometheusMetricsServerJarFile; + private final SecretsProviderConfigurator secretsProviderConfigurator; private final String logDirectory = "logs/functions"; private Timer changeConfigMapTimer; private AppsV1Api appsClient; @@ -93,7 +99,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { AuthenticationConfig authConfig, Integer expectedMetricsCollectionInterval, String changeConfigMap, - String changeConfigMapNamespace) { + String changeConfigMapNamespace, + SecretsProviderConfigurator secretsProviderConfigurator) { this.kubernetesInfo = new KubernetesInfo(); this.kubernetesInfo.setK8Uri(k8Uri); if (!isEmpty(jobNamespace)) { @@ -126,6 +133,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py"; this.prometheusMetricsServerJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/PrometheusMetricsServer.jar"; this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval; + this.secretsProviderConfigurator = secretsProviderConfigurator; } @Override @@ -169,7 +177,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { this.kubernetesInfo.getPulsarAdminUrl(), stateStorageServiceUri, authConfig, - expectedMetricsCollectionInterval); + secretsProviderConfigurator, + expectedMetricsCollectionInterval); } @Override @@ -179,6 +188,12 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { @Override public void doAdmissionChecks(Function.FunctionDetails functionDetails) { KubernetesRuntime.doChecks(functionDetails); + if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) { + Type type = new TypeToken<Map<String, Object>>() { + }.getType(); + Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type); + secretsProviderConfigurator.validateSecretMap(secretsMap); + } } @VisibleForTesting diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 2fa7f82..76db513 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -22,6 +22,7 @@ package org.apache.pulsar.functions.runtime; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.gson.Gson; import com.google.protobuf.Empty; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -29,9 +30,12 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceControlGrpc; +import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; +import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import java.io.InputStream; import java.util.List; @@ -60,6 +64,7 @@ class ProcessRuntime implements Runtime { private ScheduledExecutorService timer; private InstanceConfig instanceConfig; private final Long expectedHealthCheckInterval; + private final SecretsProviderConfigurator secretsProviderConfigurator; private static final long GRPC_TIMEOUT_SECS = 5; ProcessRuntime(InstanceConfig instanceConfig, @@ -69,11 +74,18 @@ class ProcessRuntime implements Runtime { String pulsarServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, + SecretsProviderConfigurator secretsProviderConfigurator, Long expectedHealthCheckInterval) throws Exception { this.instanceConfig = instanceConfig; this.instancePort = instanceConfig.getPort(); this.expectedHealthCheckInterval = expectedHealthCheckInterval; + this.secretsProviderConfigurator = secretsProviderConfigurator; String logConfigFile = null; + String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails()); + String secretsProviderConfig = null; + if (secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()) != null) { + secretsProviderConfig = new Gson().toJson(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails())); + } switch (instanceConfig.getFunctionDetails().getRuntime()) { case JAVA: logConfigFile = "java_instance_log4j2.yml"; @@ -84,7 +96,7 @@ class ProcessRuntime implements Runtime { } this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl, authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval, - logConfigFile, false, null, null); + logConfigFile, secretsProviderClassName, secretsProviderConfig, false, null, null); } /** @@ -258,6 +270,7 @@ class ProcessRuntime implements Runtime { deathException = null; try { ProcessBuilder processBuilder = new ProcessBuilder(processArgs).inheritIO(); + secretsProviderConfigurator.configureProcessRuntimeSecretsProvider(processBuilder, instanceConfig.getFunctionDetails()); log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command())); process = processBuilder.start(); } catch (Exception ex) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java index 78b069c..41a28fb 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java @@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry; import java.nio.file.Paths; @@ -37,6 +38,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory { private final String pulsarServiceUrl; private final String stateStorageServiceUrl; private AuthenticationConfig authConfig; + private SecretsProviderConfigurator secretsProviderConfigurator; private String javaInstanceJarFile; private String pythonInstanceFile; private String logDirectory; @@ -47,10 +49,12 @@ public class ProcessRuntimeFactory implements RuntimeFactory { AuthenticationConfig authConfig, String javaInstanceJarFile, String pythonInstanceFile, - String logDirectory) { + String logDirectory, + SecretsProviderConfigurator secretsProviderConfigurator) { this.pulsarServiceUrl = pulsarServiceUrl; this.stateStorageServiceUrl = stateStorageServiceUrl; this.authConfig = authConfig; + this.secretsProviderConfigurator = secretsProviderConfigurator; this.javaInstanceJarFile = javaInstanceJarFile; this.pythonInstanceFile = pythonInstanceFile; this.logDirectory = logDirectory; @@ -113,6 +117,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory { pulsarServiceUrl, stateStorageServiceUrl, authConfig, + secretsProviderConfigurator, expectedHealthCheckInterval); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 2647536..f12222e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime; import com.google.protobuf.util.JsonFormat; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; @@ -49,6 +50,8 @@ class RuntimeUtils { Integer grpcPort, Long expectedHealthCheckInterval, String logConfigFile, + String secretsProviderClassName, + String secretsProviderConfig, Boolean installUserCodeDepdendencies, String pythonDependencyRepository, String pythonExtraDependencyRepository) throws Exception { @@ -150,6 +153,13 @@ class RuntimeUtils { } args.add("--expected_healthcheck_interval"); args.add(String.valueOf(expectedHealthCheckInterval)); + + args.add("--secrets_provider"); + args.add(secretsProviderClassName); + if (!StringUtils.isEmpty(secretsProviderConfig)) { + args.add("--secrets_provider_config"); + args.add(secretsProviderConfig); + } return args; } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index 9dafbe9..5e42c52 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -28,6 +28,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.functions.instance.JavaInstanceRunnable; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; @@ -51,7 +52,8 @@ class ThreadRuntime implements Runtime { ThreadGroup threadGroup, String jarFile, PulsarClient pulsarClient, - String stateStorageServiceUrl) { + String stateStorageServiceUrl, + SecretsProvider secretsProvider) { this.instanceConfig = instanceConfig; if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) { throw new RuntimeException("Thread Container only supports Java Runtime"); @@ -61,7 +63,8 @@ class ThreadRuntime implements Runtime { fnCache, jarFile, pulsarClient, - stateStorageServiceUrl); + stateStorageServiceUrl, + secretsProvider); this.threadGroup = threadGroup; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java index dfbbb64..846028d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java @@ -28,9 +28,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl; @@ -44,15 +44,18 @@ public class ThreadRuntimeFactory implements RuntimeFactory { private final FunctionCacheManager fnCache; private final PulsarClient pulsarClient; private final String storageServiceUrl; + private final SecretsProvider secretsProvider; private volatile boolean closed; public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl, - AuthenticationConfig authConfig) throws Exception { - this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl); + AuthenticationConfig authConfig, SecretsProvider secretsProvider) throws Exception { + this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl, secretsProvider); } @VisibleForTesting - public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl) { + public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl, + SecretsProvider secretsProvider) { + this.secretsProvider = secretsProvider; this.fnCache = new FunctionCacheManagerImpl(); this.threadGroup = new ThreadGroup(threadGroupName); this.pulsarClient = pulsarClient; @@ -90,7 +93,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory { threadGroup, jarFile, pulsarClient, - storageServiceUrl); + storageServiceUrl, + secretsProvider); } @Override diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java index 0288660..5c05de2 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java @@ -24,6 +24,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.ConsumerSpec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; @@ -73,8 +74,8 @@ public class KubernetesRuntimeTest { this.logDirectory = "logs/functions"; this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir, false, true, "myrepo", "anotherrepo", - null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, - null, null, null)); + null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null, + null, null, new DefaultSecretsProviderConfigurator())); doNothing().when(this.factory).setupClient(); } @@ -123,7 +124,7 @@ public class KubernetesRuntimeTest { KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 28); + assertEquals(args.size(), 30); String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml " @@ -137,7 +138,8 @@ public class KubernetesRuntimeTest { + "' --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port " + args.get(23) + " --state_storage_serviceurl " + stateStorageServiceUrl - + " --expected_healthcheck_interval -1"; + + " --expected_healthcheck_interval -1" + + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"; assertEquals(String.join(" ", args), expectedArgs); } @@ -147,7 +149,7 @@ public class KubernetesRuntimeTest { KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 32); + assertEquals(args.size(), 34); String expectedArgs = "python " + pythonInstanceFile + " --py " + pulsarRootDir + "/" + userJarFile + " --logging_directory " + logDirectory @@ -162,7 +164,8 @@ public class KubernetesRuntimeTest { + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails()) + "' --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port " + args.get(29) - + " --expected_healthcheck_interval -1"; + + " --expected_healthcheck_interval -1" + + " --secrets_provider secretsprovider.ClearTextSecretsProvider"; assertEquals(String.join(" ", args), expectedArgs); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index fe492b8..c2f3638 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -21,16 +21,22 @@ package org.apache.pulsar.functions.runtime; import static org.testng.Assert.assertEquals; +import com.google.gson.reflect.TypeToken; import com.google.protobuf.util.JsonFormat; +import java.lang.reflect.Type; import java.util.HashMap; import java.util.List; import java.util.Map; +import io.kubernetes.client.models.V1Container; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.ConsumerSpec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; +import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; +import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; @@ -40,6 +46,48 @@ import org.testng.annotations.Test; */ public class ProcessRuntimeTest { + class TestSecretsProviderConfigurator implements SecretsProviderConfigurator { + + @Override + public void init(Map<String, String> config) { + + } + + @Override + public String getSecretsProviderClassName(FunctionDetails functionDetails) { + if (functionDetails.getRuntime() == FunctionDetails.Runtime.JAVA) { + return ClearTextSecretsProvider.class.getName(); + } else { + return "secretsprovider.ClearTextSecretsProvider"; + } + } + + @Override + public Map<String, String> getSecretsProviderConfig(FunctionDetails functionDetails) { + Map<String, String> config = new HashMap<>(); + config.put("Config", "Value"); + return config; + } + + @Override + public void configureKubernetesRuntimeSecretsProvider(V1Container container, FunctionDetails functionDetails) { + } + + @Override + public void configureProcessRuntimeSecretsProvider(ProcessBuilder processBuilder, FunctionDetails functionDetails) { + } + + @Override + public Type getSecretObjectType() { + return TypeToken.get(String.class).getType(); + } + + @Override + public void validateSecretMap(Map<String, Object> secretMap) { + + } + } + private static final String TEST_TENANT = "test-function-tenant"; private static final String TEST_NAMESPACE = "test-function-namespace"; private static final String TEST_NAME = "test-function-container"; @@ -67,7 +115,8 @@ public class ProcessRuntimeTest { this.stateStorageServiceUrl = "bk://localhost:4181"; this.logDirectory = "Users/user/logs"; this.factory = new ProcessRuntimeFactory( - pulsarServiceUrl, stateStorageServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, logDirectory); + pulsarServiceUrl, stateStorageServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, logDirectory, + new TestSecretsProviderConfigurator()); } @AfterMethod @@ -115,7 +164,7 @@ public class ProcessRuntimeTest { ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 28); + assertEquals(args.size(), 32); String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile + " -Dlog4j.configurationFile=java_instance_log4j2.yml " @@ -129,7 +178,9 @@ public class ProcessRuntimeTest { + "' --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port " + args.get(23) + " --state_storage_serviceurl " + stateStorageServiceUrl - + " --expected_healthcheck_interval 30"; + + " --expected_healthcheck_interval 30" + + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider" + + " --secrets_provider_config {\"Config\":\"Value\"}"; assertEquals(String.join(" ", args), expectedArgs); } @@ -139,7 +190,7 @@ public class ProcessRuntimeTest { ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 26); + assertEquals(args.size(), 30); String expectedArgs = "python " + pythonInstanceFile + " --py " + userJarFile + " --logging_directory " + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() @@ -149,7 +200,9 @@ public class ProcessRuntimeTest { + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails()) + "' --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port " + args.get(23) - + " --expected_healthcheck_interval 30"; + + " --expected_healthcheck_interval 30" + + " --secrets_provider secretsprovider.ClearTextSecretsProvider" + + " --secrets_provider_config {\"Config\":\"Value\"}"; assertEquals(String.join(" ", args), expectedArgs); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index d8bf252..100c7c8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -54,6 +54,10 @@ import com.google.common.annotations.VisibleForTesting; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.runtime.Runtime; +import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; +import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; +import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; +import org.apache.pulsar.functions.utils.Reflections; /** * This class managers all aspects of functions assignments and running of function assignments for this worker @@ -104,6 +108,14 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.workerService = workerService; this.functionAdmin = workerService.getFunctionAdmin(); + SecretsProviderConfigurator secretsProviderConfigurator; + if (!StringUtils.isEmpty(workerConfig.getSecretsProviderConfiguratorClassName())) { + secretsProviderConfigurator = (SecretsProviderConfigurator) Reflections.createInstance(workerConfig.getSecretsProviderConfiguratorClassName(), ClassLoader.getSystemClassLoader()); + } else { + secretsProviderConfigurator = new DefaultSecretsProviderConfigurator(); + } + secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig()); + AuthenticationConfig authConfig = AuthenticationConfig.builder() .clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin()) .clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters()) @@ -116,7 +128,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ workerConfig.getThreadContainerFactory().getThreadGroupName(), workerConfig.getPulsarServiceUrl(), workerConfig.getStateStorageServiceUrl(), - authConfig); + authConfig, + new ClearTextSecretsProvider()); } else if (workerConfig.getProcessContainerFactory() != null) { this.runtimeFactory = new ProcessRuntimeFactory( workerConfig.getPulsarServiceUrl(), @@ -124,7 +137,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ authConfig, workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(), workerConfig.getProcessContainerFactory().getPythonInstanceLocation(), - workerConfig.getProcessContainerFactory().getLogDirectory()); + workerConfig.getProcessContainerFactory().getLogDirectory(), + secretsProviderConfigurator); } else if (workerConfig.getKubernetesContainerFactory() != null){ this.runtimeFactory = new KubernetesRuntimeFactory( workerConfig.getKubernetesContainerFactory().getK8Uri(), @@ -142,7 +156,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ authConfig, workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval(), workerConfig.getKubernetesContainerFactory().getChangeConfigMap(), - workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace()); + workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(), + secretsProviderConfigurator); } else { throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 40ae567..5a95a00 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -152,6 +152,12 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { } private KubernetesContainerFactory kubernetesContainerFactory; + // The classname of the secrets provider configurator. + private String secretsProviderConfiguratorClassName; + // Any config the secret provider configurator might need. This is passed on + // to the init method of the secretproviderconfigurator + private Map<String, String> secretsProviderConfiguratorConfig; + public String getFunctionMetadataTopic() { return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName); } @@ -199,4 +205,4 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { public void setProperties(Properties properties) { this.properties = properties; } -} +} \ No newline at end of file diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 0b8d177..945a0cc 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; +import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; import org.mockito.Mockito; import org.mockito.invocation.Invocation; @@ -80,7 +81,7 @@ public class SchedulerManagerTest { private ScheduledExecutorService executor; @BeforeMethod - public void setup() throws PulsarClientException { + public void setup() { WorkerConfig workerConfig = new WorkerConfig(); workerConfig.setWorkerId("worker-1"); workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test")); @@ -140,7 +141,7 @@ public class SchedulerManagerTest { functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy"); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider()); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -186,7 +187,7 @@ public class SchedulerManagerTest { functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy"); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider()); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -233,7 +234,7 @@ public class SchedulerManagerTest { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy"); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider()); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -293,7 +294,7 @@ public class SchedulerManagerTest { functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy"); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider()); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -358,7 +359,7 @@ public class SchedulerManagerTest { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy"); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider()); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -469,7 +470,7 @@ public class SchedulerManagerTest { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy"); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider()); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -596,7 +597,7 @@ public class SchedulerManagerTest { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy"); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider()); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); @@ -650,7 +651,7 @@ public class SchedulerManagerTest { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy"); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider()); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -783,7 +784,7 @@ public class SchedulerManagerTest { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy"); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider()); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments