This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new da83322 Added ability a download dependencies in Kubernetes (#2729) da83322 is described below commit da833223404e6ea4ce2a13f65e9f74bb0db783a1 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Oct 5 10:14:23 2018 -0700 Added ability a download dependencies in Kubernetes (#2729) * Add a config to allow user code dependencies to be installed at runtime. Enable it only for Kubernetes runtime * Fix bug * Fixed indentation issue * Specify the right cmd line * Install dep only in the temp dir * Queue is not in python3. Thus add a try catch block * Fixed unittest --- pulsar-functions/instance/src/main/python/python_instance.py | 7 +++++-- .../instance/src/main/python/python_instance_main.py | 8 ++++++-- .../org/apache/pulsar/functions/runtime/JavaInstanceMain.java | 3 +++ .../org/apache/pulsar/functions/runtime/KubernetesRuntime.java | 3 ++- .../pulsar/functions/runtime/KubernetesRuntimeFactory.java | 4 ++++ .../org/apache/pulsar/functions/runtime/ProcessRuntime.java | 2 +- .../java/org/apache/pulsar/functions/runtime/RuntimeUtils.java | 7 ++++++- .../apache/pulsar/functions/runtime/KubernetesRuntimeTest.java | 10 +++++----- .../apache/pulsar/functions/worker/FunctionRuntimeManager.java | 1 + .../java/org/apache/pulsar/functions/worker/WorkerConfig.java | 1 + 10 files changed, 34 insertions(+), 12 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index f8a14ff..2aae5f6 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -26,7 +26,10 @@ import base64 import os import signal import time -import Queue +try: + import Queue as queue +except: + import queue import threading from functools import partial from collections import namedtuple @@ -117,7 +120,7 @@ class PythonInstance(object): def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) self.user_code = user_code - self.queue = Queue.Queue(max_buffered_tuples) + self.queue = queue.Queue(max_buffered_tuples) self.log_topic_handler = None if function_details.logTopic is not None and function_details.logTopic != "": self.log_topic_handler = log.LogTopicHandler(str(function_details.logTopic), pulsar_client) diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 6ef74c2..5fc899a 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -71,6 +71,7 @@ def main(): parser.add_argument('--logging_directory', required=True, help='Logging Directory') parser.add_argument('--logging_file', required=True, help='Log file name') parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int) + parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool) args = parser.parse_args() function_details = Function_pb2.FunctionDetails() @@ -82,8 +83,11 @@ def main(): json_format.Parse(args.function_details, function_details) if os.path.splitext(str(args.py))[1] == '.whl': - zpfile = zipfile.ZipFile(str(args.py), 'r') - zpfile.extractall(os.path.dirname(str(args.py))) + if args.install_usercode_dependencies: + os.system("pip install -t %s %s" % (os.path.dirname(str(args.py)), str(args.py))) + else: + zpfile = zipfile.ZipFile(str(args.py), 'r') + zpfile.extractall(os.path.dirname(str(args.py))) sys.path.insert(0, os.path.dirname(str(args.py))) log_file = os.path.join(args.logging_directory, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 1551f6f..157de9e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -95,6 +95,9 @@ public class JavaInstanceMain implements AutoCloseable { @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true) protected int expectedHealthCheckInterval; + @Parameter(names = "--install_usercode_dependencies", description = "Do we need to explictly install any user code dependencies(Does not apply to Java", required = false) + protected Boolean installUsercodeDependencies; + private Server server; private RuntimeSpawner runtimeSpawner; private ThreadRuntimeFactory containerFactory; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 43f5a5d..631b910 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -96,6 +96,7 @@ class KubernetesRuntime implements Runtime { CoreV1Api coreClient, String jobNamespace, Map<String, String> customLabels, + Boolean installUserCodeDependencies, String pulsarDockerImageName, String pulsarRootDir, InstanceConfig instanceConfig, @@ -118,7 +119,7 @@ class KubernetesRuntime implements Runtime { this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName; this.pulsarAdminUrl = pulsarAdminUrl; this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl, - authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "conf/log4j2.yaml"); + authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "conf/log4j2.yaml", installUserCodeDependencies); running = false; doChecks(instanceConfig.getFunctionDetails()); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index cba9ebf..dee265f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -45,6 +45,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private final String pulsarDockerImageName; private final String pulsarRootDir; private final Boolean submittingInsidePod; + private final Boolean installUserCodeDependencies; private final Map<String, String> customLabels; private final String pulsarAdminUri; private final String pulsarServiceUri; @@ -62,6 +63,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { String pulsarDockerImageName, String pulsarRootDir, Boolean submittingInsidePod, + Boolean installUserCodeDependencies, Map<String, String> customLabels, String pulsarServiceUri, String pulsarAdminUri, @@ -84,6 +86,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { this.pulsarRootDir = "/pulsar"; } this.submittingInsidePod = submittingInsidePod; + this.installUserCodeDependencies = installUserCodeDependencies; this.customLabels = customLabels; this.pulsarServiceUri = pulsarServiceUri; this.pulsarAdminUri = pulsarAdminUri; @@ -119,6 +122,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { coreClient, jobNamespace, customLabels, + installUserCodeDependencies, pulsarDockerImageName, pulsarRootDir, instanceConfig, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 2146376..3534915 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -74,7 +74,7 @@ class ProcessRuntime implements Runtime { this.expectedHealthCheckInterval = expectedHealthCheckInterval; this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl, authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval, - "java_instance_log4j2.yml"); + "java_instance_log4j2.yml", false); } /** diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index fe2a88e..3342482 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -47,7 +47,8 @@ class RuntimeUtils { String shardId, Integer grpcPort, Long expectedHealthCheckInterval, - String javaLog4jFileName) throws Exception { + String javaLog4jFileName, + Boolean installUserCodeDepdendencies) throws Exception { List<String> args = new LinkedList<>(); if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { args.add("java"); @@ -130,6 +131,10 @@ class RuntimeUtils { } args.add("--expected_healthcheck_interval"); args.add(String.valueOf(expectedHealthCheckInterval)); + if (installUserCodeDepdendencies != null && installUserCodeDepdendencies) { + args.add("--install_usercode_dependencies"); + args.add("True"); + } return args; } } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java index f4aeafc..6d2fc23 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java @@ -72,7 +72,7 @@ public class KubernetesRuntimeTest { this.stateStorageServiceUrl = "bk://localhost:4181"; this.logDirectory = "logs/functions"; this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir, - false, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null)); + false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null)); doNothing().when(this.factory).setupClient(); } @@ -121,7 +121,7 @@ public class KubernetesRuntimeTest { KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 28); + assertEquals(args.size(), 30); String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile + " -Dlog4j.configurationFile=conf/log4j2.yaml " @@ -135,7 +135,7 @@ public class KubernetesRuntimeTest { + "' --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port " + args.get(23) + " --state_storage_serviceurl " + stateStorageServiceUrl - + " --expected_healthcheck_interval -1"; + + " --expected_healthcheck_interval -1 --install_usercode_dependencies True"; assertEquals(String.join(" ", args), expectedArgs); } @@ -145,7 +145,7 @@ public class KubernetesRuntimeTest { KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 24); + assertEquals(args.size(), 26); String expectedArgs = "python " + pythonInstanceFile + " --py " + pulsarRootDir + "/" + userJarFile + " --logging_directory " + logDirectory + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id " @@ -154,7 +154,7 @@ public class KubernetesRuntimeTest { + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails()) + "' --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port " + args.get(21) - + " --expected_healthcheck_interval -1"; + + " --expected_healthcheck_interval -1 --install_usercode_dependencies True"; assertEquals(String.join(" ", args), expectedArgs); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index e5b4a6b..1b8a590 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -125,6 +125,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(), workerConfig.getKubernetesContainerFactory().getPulsarRootDir(), workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(), + workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(), workerConfig.getKubernetesContainerFactory().getCustomLabels(), StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(), StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(), diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index a3c393f..c18f824 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -139,6 +139,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { private Boolean submittingInsidePod; private String pulsarServiceUrl; private String pulsarAdminUrl; + private Boolean installUserCodeDependencies; private Map<String, String> customLabels; } private KubernetesContainerFactory kubernetesContainerFactory;