This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f0dc3443b91167fddcc0aaace1e912a5093242fb
Author: Andrey Yegorov <[email protected]>
AuthorDate: Sat Nov 6 13:17:04 2021 -0700

    k8s runtime: force deletion to avoid hung function worker during connector 
restart (#12504)
    
    (cherry picked from commit a3f6aba81a7bbd55a8429cd724694cdcee7d3f2e)
---
 conf/functions_worker.yml                                     |  4 ++++
 .../functions/runtime/kubernetes/KubernetesRuntime.java       | 11 +++++++----
 .../runtime/kubernetes/KubernetesRuntimeFactory.java          |  3 +++
 .../runtime/kubernetes/KubernetesRuntimeFactoryConfig.java    |  6 ++++++
 site2/docs/functions-runtime.md                               |  4 ++++
 5 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 58dfc69..9ca5f7b 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -216,6 +216,10 @@ functionRuntimeFactoryConfigs:
 #    extraFunctionDependenciesDir:
 #    # Additional memory padding added on top of the memory requested by the 
function per on a per instance basis
 #    percentMemoryPadding: 10
+#    # The duration in seconds before the StatefulSet deleted on function 
stop/restart.
+#    # Value must be non-negative integer. The value zero indicates delete 
immediately.
+#    # Default is 5 seconds.
+#    gracePeriodSeconds: 5
 
 ## A set of the minimum amount of resources functions must request.
 ## Support for this depends on function runtime.
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index a483bd0..1301596 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -149,6 +149,7 @@ public class KubernetesRuntime implements Runtime {
     private int percentMemoryPadding;
     private double cpuOverCommitRatio;
     private double memoryOverCommitRatio;
+    private int gracePeriodSeconds;
     private final Optional<KubernetesFunctionAuthProvider> 
functionAuthDataCacheProvider;
     private final AuthenticationConfig authConfig;
     private Integer grpcPort;
@@ -186,6 +187,7 @@ public class KubernetesRuntime implements Runtime {
                       int percentMemoryPadding,
                       double cpuOverCommitRatio,
                       double memoryOverCommitRatio,
+                      int gracePeriodSeconds,
                       Optional<KubernetesFunctionAuthProvider> 
functionAuthDataCacheProvider,
                       boolean authenticationEnabled,
                       Integer grpcPort,
@@ -212,6 +214,7 @@ public class KubernetesRuntime implements Runtime {
         this.percentMemoryPadding = percentMemoryPadding;
         this.cpuOverCommitRatio = cpuOverCommitRatio;
         this.memoryOverCommitRatio = memoryOverCommitRatio;
+        this.gracePeriodSeconds = gracePeriodSeconds;
         this.authenticationEnabled = authenticationEnabled;
         this.manifestCustomizer = manifestCustomizer;
         this.functionInstanceClassPath = functionInstanceClassPath;
@@ -567,7 +570,7 @@ public class KubernetesRuntime implements Runtime {
     public void deleteStatefulSet() throws InterruptedException {
         String statefulSetName = 
createJobName(instanceConfig.getFunctionDetails(), this.jobName);
         final V1DeleteOptions options = new V1DeleteOptions();
-        options.setGracePeriodSeconds(5L);
+        options.setGracePeriodSeconds((long)gracePeriodSeconds);
         options.setPropagationPolicy("Foreground");
 
         String fqfn = 
FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
@@ -583,8 +586,8 @@ public class KubernetesRuntime implements Runtime {
                         response = appsClient.deleteNamespacedStatefulSetCall(
                                 statefulSetName,
                                 jobNamespace, null, null,
-                                5, null, "Foreground",
-                                null, null)
+                                gracePeriodSeconds, null, "Foreground",
+                                options, null)
                                 .execute();
                     } catch (ApiException e) {
                         // if already deleted
@@ -735,7 +738,7 @@ public class KubernetesRuntime implements Runtime {
                                 serviceName,
                                 jobNamespace, null, null,
                                 0, null,
-                                "Foreground", null, null).execute();
+                                "Foreground", options, null).execute();
                     } catch (ApiException e) {
                         // if already deleted
                         if (e.getCode() == HTTP_NOT_FOUND) {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 2a47da6..d98a161 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -103,6 +103,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
     private String narExtractionDirectory;
     private String functionInstanceClassPath;
     private String downloadDirectory;
+    private int gracePeriodSeconds;
 
     @ToString.Exclude
     @EqualsAndHashCode.Exclude
@@ -200,6 +201,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
         this.percentMemoryPadding = factoryConfig.getPercentMemoryPadding();
         this.cpuOverCommitRatio = factoryConfig.getCpuOverCommitRatio();
         this.memoryOverCommitRatio = factoryConfig.getMemoryOverCommitRatio();
+        this.gracePeriodSeconds = factoryConfig.getGracePeriodSeconds();
         this.pulsarServiceUrl = 
StringUtils.isEmpty(factoryConfig.getPulsarServiceUrl())
                 ? workerConfig.getPulsarServiceUrl() : 
factoryConfig.getPulsarServiceUrl();
         this.pulsarAdminUrl = 
StringUtils.isEmpty(factoryConfig.getPulsarAdminUrl())
@@ -318,6 +320,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
             percentMemoryPadding,
             cpuOverCommitRatio,
             memoryOverCommitRatio,
+            gracePeriodSeconds,
             authProvider,
             authenticationEnabled,
             grpcPort,
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
index a3ef418..e3b758f 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -161,5 +161,11 @@ public class KubernetesRuntimeFactoryConfig {
             doc = "The classpath where function instance files stored"
     )
     private String functionInstanceClassPath = "";
+    @FieldContext(
+            doc = "The duration in seconds before the StatefulSet deleted on 
function stop/restart. " +
+                    "Value must be non-negative integer. The value zero 
indicates delete immediately. " +
+                    "Default is 5 seconds."
+    )
+    protected int gracePeriodSeconds = 5;
 
 }
diff --git a/site2/docs/functions-runtime.md b/site2/docs/functions-runtime.md
index 0155f17..09eb1ef 100644
--- a/site2/docs/functions-runtime.md
+++ b/site2/docs/functions-runtime.md
@@ -136,6 +136,10 @@ functionRuntimeFactoryConfigs:
   extraFunctionDependenciesDir:
   # Additional memory padding added on top of the memory requested by the 
function per on a per instance basis
   percentMemoryPadding: 10
+  # The duration (in seconds) before the StatefulSet is deleted after a 
function stops or restarts.
+  # Value must be a non-negative integer. 0 indicates the StatefulSet is 
deleted immediately.
+  # Default is 5 seconds.
+  gracePeriodSeconds: 5
 ```
 
 If you run functions worker embedded in a broker on Kubernetes, you can use 
the default settings. 

Reply via email to