This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new a5215d7 [SPARK-55352] Use K8s Garbage Collection to delete executor
pods
a5215d7 is described below
commit a5215d7016ca34ced6b65e08ba1442282ca8e0a4
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Feb 3 22:54:32 2026 -0800
[SPARK-55352] Use K8s Garbage Collection to delete executor pods
### What changes were proposed in this pull request?
This PR aims to use [K8s Garbage
Collection](https://kubernetes.io/docs/concepts/architecture/garbage-collection/)
to delete executor pods.
### Why are the changes needed?
To avoid massive API invocation overhead during explicit executor deletions.
### Does this PR introduce _any_ user-facing change?
There is a timing difference for executor pod deletions. However,
eventually, all executor pods are garbage collected after driver pod is deleted.
### How was this patch tested?
Pass the CIs with newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #484 from dongjoon-hyun/SPARK-55352.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../k8s/operator/SparkAppSubmissionWorker.java | 6 ++++
.../k8s/operator/SparkAppSubmissionWorkerTest.java | 33 ++++++++++++++++++++++
2 files changed, 39 insertions(+)
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
index 7b13e92..6f38f90 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
@@ -41,6 +41,7 @@ import org.apache.spark.deploy.k8s.submit.RMainAppResource;
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
import org.apache.spark.k8s.operator.spec.ConfigMapSpec;
import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec;
+import org.apache.spark.k8s.operator.spec.ResourceRetainPolicy;
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
import org.apache.spark.k8s.operator.utils.ModelUtils;
import org.apache.spark.k8s.operator.utils.StringUtils;
@@ -167,6 +168,11 @@ public class SparkAppSubmissionWorker {
effectiveSparkConf.setIfMissing("spark.app.id", appId);
effectiveSparkConf.setIfMissing("spark.authenticate", "true");
effectiveSparkConf.setIfMissing("spark.io.encryption.enabled", "true");
+ // Use K8s Garbage Collection instead of explicit API invocations
+ if (applicationSpec.getApplicationTolerations().getResourceRetainPolicy()
!=
+ ResourceRetainPolicy.Always) {
+
effectiveSparkConf.setIfMissing("spark.kubernetes.executor.deleteOnTermination",
"false");
+ }
return SparkAppDriverConf.create(
effectiveSparkConf,
sparkVersion,
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
index 71351cf..b1a350c 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
@@ -43,6 +43,8 @@ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
import org.apache.spark.deploy.k8s.submit.RMainAppResource;
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.spec.ApplicationTolerations;
+import org.apache.spark.k8s.operator.spec.ResourceRetainPolicy;
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
import org.apache.spark.k8s.operator.status.ApplicationStatus;
@@ -73,6 +75,7 @@ class SparkAppSubmissionWorkerTest {
when(mockSpec.getProxyUser()).thenReturn("foo-user");
when(mockSpec.getMainClass()).thenReturn("foo-class");
when(mockSpec.getDriverArgs()).thenReturn(List.of("a", "b"));
+ when(mockSpec.getApplicationTolerations()).thenReturn(new
ApplicationTolerations());
SparkAppSubmissionWorker submissionWorker = new
SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp,
overrides);
@@ -116,6 +119,7 @@ class SparkAppSubmissionWorkerTest {
ObjectMeta appMeta = new
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
when(mockApp.getSpec()).thenReturn(mockSpec);
when(mockApp.getMetadata()).thenReturn(appMeta);
+ when(mockSpec.getApplicationTolerations()).thenReturn(new
ApplicationTolerations());
when(mockSpec.getPyFiles()).thenReturn("foo");
SparkAppSubmissionWorker submissionWorker = new
SparkAppSubmissionWorker();
@@ -141,6 +145,7 @@ class SparkAppSubmissionWorkerTest {
ObjectMeta appMeta = new
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
when(mockApp.getSpec()).thenReturn(mockSpec);
when(mockApp.getMetadata()).thenReturn(appMeta);
+ when(mockSpec.getApplicationTolerations()).thenReturn(new
ApplicationTolerations());
when(mockSpec.getMainClass()).thenReturn("org.apache.spark.deploy.PythonRunner");
when(mockSpec.getPyFiles()).thenReturn("main.py,lib.py");
@@ -169,6 +174,7 @@ class SparkAppSubmissionWorkerTest {
ObjectMeta appMeta = new
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
when(mockApp.getSpec()).thenReturn(mockSpec);
when(mockApp.getMetadata()).thenReturn(appMeta);
+ when(mockSpec.getApplicationTolerations()).thenReturn(new
ApplicationTolerations());
when(mockSpec.getSparkRFiles()).thenReturn("foo");
SparkAppSubmissionWorker submissionWorker = new
SparkAppSubmissionWorker();
@@ -257,6 +263,7 @@ class SparkAppSubmissionWorkerTest {
when(mockSpec.getSparkConf()).thenReturn(appProps);
when(mockApp.getSpec()).thenReturn(mockSpec);
when(mockApp.getMetadata()).thenReturn(appMeta);
+ when(mockSpec.getApplicationTolerations()).thenReturn(new
ApplicationTolerations());
SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp,
Map.of());
@@ -274,6 +281,7 @@ class SparkAppSubmissionWorkerTest {
appProps.put("spark.kubernetes.executor.container.image",
"apache/spark:{{SPARK_VERSION}}");
appProps.put("spark.kubernetes.key", "apache/spark:{{SPARK_VERSION}}");
ObjectMeta appMeta = new
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+ when(mockSpec.getApplicationTolerations()).thenReturn(new
ApplicationTolerations());
when(mockSpec.getSparkConf()).thenReturn(appProps);
when(mockApp.getSpec()).thenReturn(mockSpec);
when(mockApp.getMetadata()).thenReturn(appMeta);
@@ -287,4 +295,29 @@ class SparkAppSubmissionWorkerTest {
assertEquals("apache/spark:dev",
conf.get("spark.kubernetes.executor.container.image"));
assertEquals("apache/spark:{{SPARK_VERSION}}",
conf.get("spark.kubernetes.key"));
}
+
+ @Test
+ void useGarbageCollectionToDeleteExecutors() {
+ SparkApplication mockApp = mock(SparkApplication.class);
+ ApplicationSpec mockSpec = mock(ApplicationSpec.class);
+ ApplicationTolerations mockTolerations =
mock(ApplicationTolerations.class);
+ when(mockSpec.getApplicationTolerations()).thenReturn(mockTolerations);
+ when(mockSpec.getSparkConf()).thenReturn(Map.of());
+ ObjectMeta appMeta = new
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+ when(mockApp.getSpec()).thenReturn(mockSpec);
+ when(mockApp.getMetadata()).thenReturn(appMeta);
+
+ SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
+
when(mockTolerations.getResourceRetainPolicy()).thenReturn(ResourceRetainPolicy.Always);
+ SparkAppDriverConf conf1 = submissionWorker.buildDriverConf(mockApp,
Map.of());
+ assertEquals("true",
conf1.get("spark.kubernetes.executor.deleteOnTermination", "true"));
+
+
when(mockTolerations.getResourceRetainPolicy()).thenReturn(ResourceRetainPolicy.Never);
+ SparkAppDriverConf conf2 = submissionWorker.buildDriverConf(mockApp,
Map.of());
+ assertEquals("false",
conf2.get("spark.kubernetes.executor.deleteOnTermination", "true"));
+
+
when(mockTolerations.getResourceRetainPolicy()).thenReturn(ResourceRetainPolicy.OnFailure);
+ SparkAppDriverConf conf3 = submissionWorker.buildDriverConf(mockApp,
Map.of());
+ assertEquals("false",
conf3.get("spark.kubernetes.executor.deleteOnTermination", "true"));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]