This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f0dc3443b91167fddcc0aaace1e912a5093242fb Author: Andrey Yegorov <[email protected]> AuthorDate: Sat Nov 6 13:17:04 2021 -0700 k8s runtime: force deletion to avoid hung function worker during connector restart (#12504) (cherry picked from commit a3f6aba81a7bbd55a8429cd724694cdcee7d3f2e) --- conf/functions_worker.yml | 4 ++++ .../functions/runtime/kubernetes/KubernetesRuntime.java | 11 +++++++---- .../runtime/kubernetes/KubernetesRuntimeFactory.java | 3 +++ .../runtime/kubernetes/KubernetesRuntimeFactoryConfig.java | 6 ++++++ site2/docs/functions-runtime.md | 4 ++++ 5 files changed, 24 insertions(+), 4 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 58dfc69..9ca5f7b 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -216,6 +216,10 @@ functionRuntimeFactoryConfigs: # extraFunctionDependenciesDir: # # Additional memory padding added on top of the memory requested by the function per on a per instance basis # percentMemoryPadding: 10 +# # The duration in seconds before the StatefulSet deleted on function stop/restart. +# # Value must be non-negative integer. The value zero indicates delete immediately. +# # Default is 5 seconds. +# gracePeriodSeconds: 5 ## A set of the minimum amount of resources functions must request. ## Support for this depends on function runtime. diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index a483bd0..1301596 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -149,6 +149,7 @@ public class KubernetesRuntime implements Runtime { private int percentMemoryPadding; private double cpuOverCommitRatio; private double memoryOverCommitRatio; + private int gracePeriodSeconds; private final Optional<KubernetesFunctionAuthProvider> functionAuthDataCacheProvider; private final AuthenticationConfig authConfig; private Integer grpcPort; @@ -186,6 +187,7 @@ public class KubernetesRuntime implements Runtime { int percentMemoryPadding, double cpuOverCommitRatio, double memoryOverCommitRatio, + int gracePeriodSeconds, Optional<KubernetesFunctionAuthProvider> functionAuthDataCacheProvider, boolean authenticationEnabled, Integer grpcPort, @@ -212,6 +214,7 @@ public class KubernetesRuntime implements Runtime { this.percentMemoryPadding = percentMemoryPadding; this.cpuOverCommitRatio = cpuOverCommitRatio; this.memoryOverCommitRatio = memoryOverCommitRatio; + this.gracePeriodSeconds = gracePeriodSeconds; this.authenticationEnabled = authenticationEnabled; this.manifestCustomizer = manifestCustomizer; this.functionInstanceClassPath = functionInstanceClassPath; @@ -567,7 +570,7 @@ public class KubernetesRuntime implements Runtime { public void deleteStatefulSet() throws InterruptedException { String statefulSetName = createJobName(instanceConfig.getFunctionDetails(), this.jobName); final V1DeleteOptions options = new V1DeleteOptions(); - options.setGracePeriodSeconds(5L); + options.setGracePeriodSeconds((long)gracePeriodSeconds); options.setPropagationPolicy("Foreground"); String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()); @@ -583,8 +586,8 @@ public class KubernetesRuntime implements Runtime { response = appsClient.deleteNamespacedStatefulSetCall( statefulSetName, jobNamespace, null, null, - 5, null, "Foreground", - null, null) + gracePeriodSeconds, null, "Foreground", + options, null) .execute(); } catch (ApiException e) { // if already deleted @@ -735,7 +738,7 @@ public class KubernetesRuntime implements Runtime { serviceName, jobNamespace, null, null, 0, null, - "Foreground", null, null).execute(); + "Foreground", options, null).execute(); } catch (ApiException e) { // if already deleted if (e.getCode() == HTTP_NOT_FOUND) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index 2a47da6..d98a161 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -103,6 +103,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private String narExtractionDirectory; private String functionInstanceClassPath; private String downloadDirectory; + private int gracePeriodSeconds; @ToString.Exclude @EqualsAndHashCode.Exclude @@ -200,6 +201,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { this.percentMemoryPadding = factoryConfig.getPercentMemoryPadding(); this.cpuOverCommitRatio = factoryConfig.getCpuOverCommitRatio(); this.memoryOverCommitRatio = factoryConfig.getMemoryOverCommitRatio(); + this.gracePeriodSeconds = factoryConfig.getGracePeriodSeconds(); this.pulsarServiceUrl = StringUtils.isEmpty(factoryConfig.getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : factoryConfig.getPulsarServiceUrl(); this.pulsarAdminUrl = StringUtils.isEmpty(factoryConfig.getPulsarAdminUrl()) @@ -318,6 +320,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, + gracePeriodSeconds, authProvider, authenticationEnabled, grpcPort, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java index a3ef418..e3b758f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java @@ -161,5 +161,11 @@ public class KubernetesRuntimeFactoryConfig { doc = "The classpath where function instance files stored" ) private String functionInstanceClassPath = ""; + @FieldContext( + doc = "The duration in seconds before the StatefulSet deleted on function stop/restart. " + + "Value must be non-negative integer. The value zero indicates delete immediately. " + + "Default is 5 seconds." + ) + protected int gracePeriodSeconds = 5; } diff --git a/site2/docs/functions-runtime.md b/site2/docs/functions-runtime.md index 0155f17..09eb1ef 100644 --- a/site2/docs/functions-runtime.md +++ b/site2/docs/functions-runtime.md @@ -136,6 +136,10 @@ functionRuntimeFactoryConfigs: extraFunctionDependenciesDir: # Additional memory padding added on top of the memory requested by the function per on a per instance basis percentMemoryPadding: 10 + # The duration (in seconds) before the StatefulSet is deleted after a function stops or restarts. + # Value must be a non-negative integer. 0 indicates the StatefulSet is deleted immediately. + # Default is 5 seconds. + gracePeriodSeconds: 5 ``` If you run functions worker embedded in a broker on Kubernetes, you can use the default settings.
