This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d1fefad  Support graceful shutdown Kubernetes (#3628)
d1fefad is described below

commit d1fefadb374713e229594ab9874487a5fa0c9572
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Wed Feb 20 09:27:56 2019 -0800

    Support graceful shutdown Kubernetes (#3628)
    
    * allow for graceful shutdown when running functions in kubernetes
    
    * cleaning up
    
    * fix unit tests
---
 .../pulsar/functions/runtime/JavaInstanceMain.java |  6 +-
 .../functions/runtime/KubernetesRuntime.java       | 53 ++++++++------
 .../pulsar/functions/runtime/ProcessRuntime.java   |  6 +-
 .../pulsar/functions/runtime/RuntimeUtils.java     | 83 ++++++++++++++++------
 .../functions/runtime/KubernetesRuntimeTest.java   | 30 ++++----
 5 files changed, 110 insertions(+), 68 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 43467ae..f480b2e 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -46,10 +46,7 @@ import org.apache.pulsar.functions.utils.Reflections;
 import java.lang.reflect.Type;
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -201,8 +198,7 @@ public class JavaInstanceMain implements AutoCloseable {
             public void run() {
                 // Use stderr here since the logger may have been reset by its 
JVM shutdown hook.
                 try {
-                    server.shutdown();
-                    runtimeSpawner.close();
+                    close();
                 } catch (Exception ex) {
                     System.err.println(ex);
                 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 2e2c63d..3fc0c69 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -185,25 +185,33 @@ public class KubernetesRuntime implements Runtime {
                 logConfigFile = pulsarRootDir + 
"/conf/functions-logging/console_logging_config.ini";
                 break;
         }
-        this.processArgs = RuntimeUtils.composeArgs(
-            instanceConfig,
-            instanceFile,
-            extraDependenciesDir,
-            logDirectory,
-            this.originalCodeFileName,
-            pulsarServiceUrl,
-            stateStorageServiceUrl,
-            authConfig,
-            "$" + ENV_SHARD_ID,
-            GRPC_PORT,
-            -1l,
-            logConfigFile,
-            secretsProviderClassName,
-            secretsProviderConfig,
-            installUserCodeDependencies,
-            pythonDependencyRepository,
-            pythonExtraDependencyRepository,
-                METRICS_PORT);
+
+        this.processArgs = new LinkedList<>();
+        this.processArgs.addAll(RuntimeUtils.getArgsBeforeCmd(instanceConfig, 
extraDependenciesDir));
+        // use exec to to launch function so that it gets launched in the 
foreground with the same PID as shell
+        // so that when we kill the pod, the signal will get propagated to the 
function code
+        this.processArgs.add("exec");
+        this.processArgs.addAll(
+                RuntimeUtils.getCmd(
+                        instanceConfig,
+                        instanceFile,
+                        extraDependenciesDir,
+                        logDirectory,
+                        this.originalCodeFileName,
+                        pulsarServiceUrl,
+                        stateStorageServiceUrl,
+                        authConfig,
+                        "$" + ENV_SHARD_ID,
+                        GRPC_PORT,
+                        -1l,
+                        logConfigFile,
+                        secretsProviderClassName,
+                        secretsProviderConfig,
+                        installUserCodeDependencies,
+                        pythonDependencyRepository,
+                        pythonExtraDependencyRepository,
+                        METRICS_PORT));
+
         doChecks(instanceConfig.getFunctionDetails());
     }
 
@@ -467,7 +475,7 @@ public class KubernetesRuntime implements Runtime {
     public void deleteStatefulSet() throws InterruptedException {
         String statefulSetName = 
createJobName(instanceConfig.getFunctionDetails());
         final V1DeleteOptions options = new V1DeleteOptions();
-        options.setGracePeriodSeconds(0L);
+        options.setGracePeriodSeconds(5L);
         options.setPropagationPolicy("Foreground");
 
         String fqfn = 
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
@@ -521,8 +529,9 @@ public class KubernetesRuntime implements Runtime {
 
         RuntimeUtils.Actions.Action waitForStatefulSetDeletion = 
RuntimeUtils.Actions.Action.builder()
                 .actionName(String.format("Waiting for statefulset for 
function %s to complete deletion", fqfn))
-                .numRetries(NUM_RETRIES)
-                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+                // set retry period to be about 2x the graceshutdown time
+                .numRetries(NUM_RETRIES * 2)
+                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS* 2)
                 .supplier(() -> {
                     V1StatefulSet response;
                     try {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 87017a6..7cc6efb 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -39,16 +39,12 @@ import 
org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.Utils;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
-import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -108,7 +104,7 @@ class ProcessRuntime implements Runtime {
                 break;
         }
         this.extraDependenciesDir = extraDependenciesDir;
-        this.processArgs = RuntimeUtils.composeArgs(
+        this.processArgs = RuntimeUtils.composeCmd(
             instanceConfig,
             instanceFile,
             // DONT SET extra dependencies here (for python runtime),
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 77e1f8f..9862b0a 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -54,24 +54,69 @@ public class RuntimeUtils {
 
     private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = 
"pulsar.functions.extra.dependencies.dir";
 
-    public static List<String> composeArgs(InstanceConfig instanceConfig,
-                                           String instanceFile,
-                                           String extraDependenciesDir, /* 
extra dependencies for running instances */
+    public static List<String> composeCmd(InstanceConfig instanceConfig,
+                                          String instanceFile,
+                                          String extraDependenciesDir, /* 
extra dependencies for running instances */
+                                          String logDirectory,
+                                          String originalCodeFileName,
+                                          String pulsarServiceUrl,
+                                          String stateStorageServiceUrl,
+                                          AuthenticationConfig authConfig,
+                                          String shardId,
+                                          Integer grpcPort,
+                                          Long expectedHealthCheckInterval,
+                                          String logConfigFile,
+                                          String secretsProviderClassName,
+                                          String secretsProviderConfig,
+                                          Boolean installUserCodeDependencies,
+                                          String pythonDependencyRepository,
+                                          String 
pythonExtraDependencyRepository,
+                                          int metricsPort) throws Exception {
+
+        final List<String> cmd = getArgsBeforeCmd(instanceConfig, 
extraDependenciesDir);
+
+        cmd.addAll(getCmd(instanceConfig, instanceFile, extraDependenciesDir, 
logDirectory,
+                originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
+                authConfig, shardId, grpcPort, expectedHealthCheckInterval,
+                logConfigFile, secretsProviderClassName, secretsProviderConfig,
+                installUserCodeDependencies, pythonDependencyRepository,
+                pythonExtraDependencyRepository, metricsPort));
+        return cmd;
+    }
+
+    public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, 
String extraDependenciesDir) {
+
+        final List<String> args = new LinkedList<>();
+        if (instanceConfig.getFunctionDetails().getRuntime() ==  
Function.FunctionDetails.Runtime.JAVA) {
+            //no-op
+        } else if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.PYTHON) {
+            // add `extraDependenciesDir` to python package searching path
+            if (StringUtils.isNotEmpty(extraDependenciesDir)) {
+                args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
+            }
+        }
+
+        return args;
+    }
+
+    public static List<String> getCmd(InstanceConfig instanceConfig,
+                                          String instanceFile,
+                                          String extraDependenciesDir, /* 
extra dependencies for running instances */
                                            String logDirectory,
-                                           String originalCodeFileName,
-                                           String pulsarServiceUrl,
-                                           String stateStorageServiceUrl,
-                                           AuthenticationConfig authConfig,
-                                           String shardId,
-                                           Integer grpcPort,
-                                           Long expectedHealthCheckInterval,
-                                           String logConfigFile,
-                                           String secretsProviderClassName,
-                                           String secretsProviderConfig,
-                                           Boolean installUserCodeDependencies,
-                                           String pythonDependencyRepository,
-                                           String 
pythonExtraDependencyRepository,
-                                           int metricsPort) throws Exception {
+                                          String originalCodeFileName,
+                                          String pulsarServiceUrl,
+                                          String stateStorageServiceUrl,
+                                          AuthenticationConfig authConfig,
+                                          String shardId,
+                                          Integer grpcPort,
+                                          Long expectedHealthCheckInterval,
+                                          String logConfigFile,
+                                          String secretsProviderClassName,
+                                          String secretsProviderConfig,
+                                          Boolean installUserCodeDependencies,
+                                          String pythonDependencyRepository,
+                                          String 
pythonExtraDependencyRepository,
+                                          int metricsPort) throws Exception {
         final List<String> args = new LinkedList<>();
         if (instanceConfig.getFunctionDetails().getRuntime() ==  
Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
@@ -105,10 +150,6 @@ public class RuntimeUtils {
             args.add("--jar");
             args.add(originalCodeFileName);
         } else if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.PYTHON) {
-            // add `extraDependenciesDir` to python package searching path
-            if (StringUtils.isNotEmpty(extraDependenciesDir)) {
-                args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
-            }
             args.add("python");
             args.add(instanceFile);
             args.add("--py");
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 7b5a879..c7a9832 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -271,14 +271,14 @@ public class KubernetesRuntimeTest {
         if (null != depsDir) {
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + 
depsDir;
             classpath = classpath + ":" + depsDir + "/*";
-            totalArgs = 34;
-            portArg = 25;
-            metricsPortArg = 27;
+            totalArgs = 35;
+            portArg = 26;
+            metricsPortArg = 28;
         } else {
             extraDepsEnv = "";
-            portArg = 24;
-            metricsPortArg = 26;
-            totalArgs = 33;
+            portArg = 25;
+            metricsPortArg = 27;
+            totalArgs = 34;
         }
         if (secretsAttached) {
             totalArgs += 4;
@@ -287,7 +287,7 @@ public class KubernetesRuntimeTest {
         assertEquals(args.size(), totalArgs,
             "Actual args : " + StringUtils.join(args, " "));
 
-        String expectedArgs = "java -cp " + classpath
+        String expectedArgs = "exec java -cp " + classpath
                 + " -Dpulsar.functions.java.instance.jar=" + 
javaInstanceJarFile
                 + extraDepsEnv
                 + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml "
@@ -352,16 +352,16 @@ public class KubernetesRuntimeTest {
         int configArg;
         int metricsPortArg;
         if (null == extraDepsDir) {
-            totalArgs = 36;
-            portArg = 29;
-            configArg = 9;
-            pythonPath = "";
-            metricsPortArg = 31;
-        } else {
-            totalArgs = 39;
+            totalArgs = 37;
             portArg = 30;
             configArg = 10;
+            pythonPath = "";
             metricsPortArg = 32;
+        } else {
+            totalArgs = 40;
+            portArg = 31;
+            configArg = 11;
+            metricsPortArg = 33;
             pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " ";
         }
         if (secretsAttached) {
@@ -370,7 +370,7 @@ public class KubernetesRuntimeTest {
 
         assertEquals(args.size(), totalArgs,
             "Actual args : " + StringUtils.join(args, " "));
-        String expectedArgs = pythonPath + "python " + pythonInstanceFile
+        String expectedArgs = pythonPath + "exec python " + pythonInstanceFile
                 + " --py " + pulsarRootDir + "/" + userJarFile
                 + " --logging_directory " + logDirectory
                 + " --logging_file " + config.getFunctionDetails().getName()

Reply via email to