This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a5215d7  [SPARK-55352] Use K8s Garbage Collection to delete executor 
pods
a5215d7 is described below

commit a5215d7016ca34ced6b65e08ba1442282ca8e0a4
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Feb 3 22:54:32 2026 -0800

    [SPARK-55352] Use K8s Garbage Collection to delete executor pods
    
    ### What changes were proposed in this pull request?
    
    This PR aims to use [K8s Garbage 
Collection](https://kubernetes.io/docs/concepts/architecture/garbage-collection/)
 to delete executor pods.
    
    ### Why are the changes needed?
    
    To avoid massive API invocation overhead during explicit executor deletions.
    
    ### Does this PR introduce _any_ user-facing change?
    
    There is a timing difference for executor pod deletions. However, 
eventually, all executor pods are garbage collected after driver pod is deleted.
    
    ### How was this patch tested?
    
    Pass the CIs with newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #484 from dongjoon-hyun/SPARK-55352.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../k8s/operator/SparkAppSubmissionWorker.java     |  6 ++++
 .../k8s/operator/SparkAppSubmissionWorkerTest.java | 33 ++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
index 7b13e92..6f38f90 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
@@ -41,6 +41,7 @@ import org.apache.spark.deploy.k8s.submit.RMainAppResource;
 import org.apache.spark.k8s.operator.spec.ApplicationSpec;
 import org.apache.spark.k8s.operator.spec.ConfigMapSpec;
 import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec;
+import org.apache.spark.k8s.operator.spec.ResourceRetainPolicy;
 import org.apache.spark.k8s.operator.spec.RuntimeVersions;
 import org.apache.spark.k8s.operator.utils.ModelUtils;
 import org.apache.spark.k8s.operator.utils.StringUtils;
@@ -167,6 +168,11 @@ public class SparkAppSubmissionWorker {
     effectiveSparkConf.setIfMissing("spark.app.id", appId);
     effectiveSparkConf.setIfMissing("spark.authenticate", "true");
     effectiveSparkConf.setIfMissing("spark.io.encryption.enabled", "true");
+    // Use K8s Garbage Collection instead of explicit API invocations
+    if (applicationSpec.getApplicationTolerations().getResourceRetainPolicy() 
!=
+        ResourceRetainPolicy.Always) {
+      
effectiveSparkConf.setIfMissing("spark.kubernetes.executor.deleteOnTermination",
 "false");
+    }
     return SparkAppDriverConf.create(
         effectiveSparkConf,
         sparkVersion,
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
index 71351cf..b1a350c 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
@@ -43,6 +43,8 @@ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
 import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
 import org.apache.spark.deploy.k8s.submit.RMainAppResource;
 import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.spec.ApplicationTolerations;
+import org.apache.spark.k8s.operator.spec.ResourceRetainPolicy;
 import org.apache.spark.k8s.operator.spec.RuntimeVersions;
 import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
 import org.apache.spark.k8s.operator.status.ApplicationStatus;
@@ -73,6 +75,7 @@ class SparkAppSubmissionWorkerTest {
       when(mockSpec.getProxyUser()).thenReturn("foo-user");
       when(mockSpec.getMainClass()).thenReturn("foo-class");
       when(mockSpec.getDriverArgs()).thenReturn(List.of("a", "b"));
+      when(mockSpec.getApplicationTolerations()).thenReturn(new 
ApplicationTolerations());
 
       SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
       SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
overrides);
@@ -116,6 +119,7 @@ class SparkAppSubmissionWorkerTest {
       ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
       when(mockApp.getSpec()).thenReturn(mockSpec);
       when(mockApp.getMetadata()).thenReturn(appMeta);
+      when(mockSpec.getApplicationTolerations()).thenReturn(new 
ApplicationTolerations());
       when(mockSpec.getPyFiles()).thenReturn("foo");
 
       SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
@@ -141,6 +145,7 @@ class SparkAppSubmissionWorkerTest {
       ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
       when(mockApp.getSpec()).thenReturn(mockSpec);
       when(mockApp.getMetadata()).thenReturn(appMeta);
+      when(mockSpec.getApplicationTolerations()).thenReturn(new 
ApplicationTolerations());
       
when(mockSpec.getMainClass()).thenReturn("org.apache.spark.deploy.PythonRunner");
       when(mockSpec.getPyFiles()).thenReturn("main.py,lib.py");
 
@@ -169,6 +174,7 @@ class SparkAppSubmissionWorkerTest {
       ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
       when(mockApp.getSpec()).thenReturn(mockSpec);
       when(mockApp.getMetadata()).thenReturn(appMeta);
+      when(mockSpec.getApplicationTolerations()).thenReturn(new 
ApplicationTolerations());
       when(mockSpec.getSparkRFiles()).thenReturn("foo");
 
       SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
@@ -257,6 +263,7 @@ class SparkAppSubmissionWorkerTest {
     when(mockSpec.getSparkConf()).thenReturn(appProps);
     when(mockApp.getSpec()).thenReturn(mockSpec);
     when(mockApp.getMetadata()).thenReturn(appMeta);
+    when(mockSpec.getApplicationTolerations()).thenReturn(new 
ApplicationTolerations());
 
     SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
     SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
Map.of());
@@ -274,6 +281,7 @@ class SparkAppSubmissionWorkerTest {
     appProps.put("spark.kubernetes.executor.container.image", 
"apache/spark:{{SPARK_VERSION}}");
     appProps.put("spark.kubernetes.key", "apache/spark:{{SPARK_VERSION}}");
     ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+    when(mockSpec.getApplicationTolerations()).thenReturn(new 
ApplicationTolerations());
     when(mockSpec.getSparkConf()).thenReturn(appProps);
     when(mockApp.getSpec()).thenReturn(mockSpec);
     when(mockApp.getMetadata()).thenReturn(appMeta);
@@ -287,4 +295,29 @@ class SparkAppSubmissionWorkerTest {
     assertEquals("apache/spark:dev", 
conf.get("spark.kubernetes.executor.container.image"));
     assertEquals("apache/spark:{{SPARK_VERSION}}", 
conf.get("spark.kubernetes.key"));
   }
+
+  @Test
+  void useGarbageCollectionToDeleteExecutors() {
+    SparkApplication mockApp = mock(SparkApplication.class);
+    ApplicationSpec mockSpec = mock(ApplicationSpec.class);
+    ApplicationTolerations mockTolerations = 
mock(ApplicationTolerations.class);
+    when(mockSpec.getApplicationTolerations()).thenReturn(mockTolerations);
+    when(mockSpec.getSparkConf()).thenReturn(Map.of());
+    ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+    when(mockApp.getSpec()).thenReturn(mockSpec);
+    when(mockApp.getMetadata()).thenReturn(appMeta);
+
+    SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
+    
when(mockTolerations.getResourceRetainPolicy()).thenReturn(ResourceRetainPolicy.Always);
+    SparkAppDriverConf conf1 = submissionWorker.buildDriverConf(mockApp, 
Map.of());
+    assertEquals("true", 
conf1.get("spark.kubernetes.executor.deleteOnTermination", "true"));
+
+    
when(mockTolerations.getResourceRetainPolicy()).thenReturn(ResourceRetainPolicy.Never);
+    SparkAppDriverConf conf2 = submissionWorker.buildDriverConf(mockApp, 
Map.of());
+    assertEquals("false", 
conf2.get("spark.kubernetes.executor.deleteOnTermination", "true"));
+
+    
when(mockTolerations.getResourceRetainPolicy()).thenReturn(ResourceRetainPolicy.OnFailure);
+    SparkAppDriverConf conf3 = submissionWorker.buildDriverConf(mockApp, 
Map.of());
+    assertEquals("false", 
conf3.get("spark.kubernetes.executor.deleteOnTermination", "true"));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to