This is an automated email from the ASF dual-hosted git repository.

ptoth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b89c5cc  [SPARK-52915] Support TTL for Spark apps
b89c5cc is described below

commit b89c5cc5dd880fdb65a93a3f14817ce9141227a6
Author: Zhou JIANG <jiang...@umich.edu>
AuthorDate: Thu Aug 14 10:41:18 2025 +0200

    [SPARK-52915] Support TTL for Spark apps
    
    ### What changes were proposed in this pull request?
    
    This PR adds support for configuring the ttl for Spark apps after it stops. 
Working with the `resourceRetainPolicy` and `resourceRetainDurationMillis`, it 
enhances the garbage collection mechanism at the custom resource level.
    
    ### Why are the changes needed?
    
    Introducing TTL helps user to more effectively configure the garbage 
collection for apps.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New configurable field spec.applicationTolerations.ttlAfterStopMillis added 
to SparkApplication CRD
    
    ### How was this patch tested?
    
    CIs - including new unit test and revised e2e scenario
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #290 from jiangzho/resource_ttl.
    
    Authored-by: Zhou JIANG <jiang...@umich.edu>
    Signed-off-by: Peter Toth <p_t...@apple.com>
---
 .../sparkapplications.spark.apache.org-v1.yaml     |   3 +
 docs/spark_custom_resources.md                     |  51 +++++-
 .../k8s/operator/spec/ApplicationTolerations.java  |  48 +++++-
 .../operator/spec/ApplicationTolerationsTest.java  |  70 ++++++++
 .../reconciler/reconcilesteps/AppCleanUpStep.java  |  73 +++++++--
 .../reconcilesteps/AppCleanUpStepTest.java         | 180 +++++++++++++++++----
 .../resource-retain-duration/chainsaw-test.yaml    |  10 +-
 .../spark-example-retain-duration.yaml             |   1 +
 8 files changed, 382 insertions(+), 54 deletions(-)

diff --git 
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
 
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
index 21d3996..6b2ee7b 100644
--- 
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
+++ 
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
@@ -17434,6 +17434,9 @@ spec:
                             - OnInfrastructureFailure
                           type: string
                       type: object
+                    ttlAfterStopMillis:
+                      default: -1
+                      type: integer
                   type: object
                 configMapSpecs:
                   items:
diff --git a/docs/spark_custom_resources.md b/docs/spark_custom_resources.md
index ad6f8b9..1f39ba5 100644
--- a/docs/spark_custom_resources.md
+++ b/docs/spark_custom_resources.md
@@ -293,6 +293,8 @@ applicationTolerations:
   resourceRetainPolicy: OnFailure
   # Secondary resources would be garbage collected 10 minutes after app 
termination 
   resourceRetainDurationMillis: 600000
+  # Garbage collect the SparkApplication custom resource itself 30 minutes 
after termination
+  ttlAfterStopMillis: 1800000
 ```
 
 to avoid operator attempt to delete driver pod and driver resources if app 
fails. Similarly,
@@ -302,7 +304,54 @@ possible to configure `resourceRetainDurationMillis` to 
define the maximal retai
 these resources. Note that this applies only to operator-created resources 
(driver pod, SparkConf
 configmap .etc). You may also want to tune 
`spark.kubernetes.driver.service.deleteOnTermination`
 and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of 
driver-created
-resources.
+resources. `ttlAfterStopMillis` controls the garbage collection behavior at 
the SparkApplication
+level after it stops. When set to a non-negative value, Spark operator would 
garbage collect the
+application (and therefore all its associated resources) after given timeout. 
If the application
+is configured to restart, `resourceRetainPolicy`, 
`resourceRetainDurationMillis` and
+`ttlAfterStopMillis` would be applied only to the last attempt.
+
+For example, if an app with below configuration:
+
+```yaml
+applicationTolerations:
+  restartConfig:
+    restartPolicy: OnFailure
+  maxRestartAttempts: 1
+  resourceRetainPolicy: Always
+  resourceRetainDurationMillis: 30000
+  ttlAfterStopMillis: 60000
+```
+
+ends up with status like:
+
+```yaml
+status:
+#... the 1st attempt
+      "5":
+        currentStateSummary: Failed
+      "6":
+        currentStateSummary: ScheduledToRestart
+# ...the 2nd attempt
+      "11":
+        currentStateSummary: Succeeded
+      "12":
+        currentStateSummary: TerminatedWithoutReleaseResources
+```
+
+The retain policy only takes effect after the final state `12`. Secondary 
resources are always
+released between attempts between `5` and `6`. TTL would be calculated based 
on the last state as
+well.
+
+| Field                                                     | Type             
                 | Default Value | Description                                  
                                                                                
                                                                             |
+|-----------------------------------------------------------|-----------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| .spec.applicationTolerations.resourceRetainPolicy         | `Always` / 
`OnFailure` / `Never`  | Never         | Configure operator to delete / retain 
secondary resources for an app after it terminates.                             
                                                                                
    |
+| .spec.applicationTolerations.resourceRetainDurationMillis | integer          
                 | -1            | Time to wait in milliseconds for releasing 
**secondary resources** after termination. Setting to negative value would 
disable the retention duration check for secondary resources after termination. 
    |
+| .spec.applicationTolerations.ttlAfterStopMillis           | integer          
                 | -1            | Time-to-live in milliseconds for 
SparkApplication and **all its associated secondary resources**. If set to a 
negative value, the application would be retained and not be garbage collected 
by operator. |
+
+Note that `ttlAfterStopMillis` applies to the app as well as its secondary 
resources. If both
+`resourceRetainDurationMillis` and `ttlAfterStopMillis` are set to 
non-negative value and the
+latter is smaller, then it takes higher precedence: operator would remove all 
resources related
+to this app after `ttlAfterStopMillis`.
 
 ## Spark Cluster
 
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
index 8e47817..20b8122 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
@@ -58,6 +58,34 @@ public class ApplicationTolerations {
   @Builder.Default
   protected Long resourceRetainDurationMillis = -1L;
 
+  /**
+   * Time-to-live in milliseconds for SparkApplication and all its associated 
secondary resources
+   * after stop. If set to a negative value, the application could be retained 
according to the
+   * retain policy. If the application is configured to restart, this would 
apply to the last
+   * attempt only.
+   */
+  @Default("-1")
+  @Builder.Default
+  protected Long ttlAfterStopMillis = -1L;
+
+  /**
+   * @return The effective retain duration for secondary resources, which 
would be the smaller value
+   *     of `resourceRetainDurationMillis` or `ttlAfterStopMillis`, if they 
are set to non-negative
+   *     value. Return -1 if none of them are set.
+   */
+  public long computeEffectiveRetainDurationMillis() {
+    if (resourceRetainDurationMillis < 0 && ttlAfterStopMillis < 0) {
+      return -1L;
+    }
+    if (resourceRetainDurationMillis < 0) {
+      return ttlAfterStopMillis;
+    }
+    if (ttlAfterStopMillis < 0) {
+      return resourceRetainDurationMillis;
+    }
+    return Math.min(resourceRetainDurationMillis, ttlAfterStopMillis);
+  }
+
   /**
    * Check whether a terminated application has exceeded the resource retain 
duration at the
    * provided instant
@@ -68,20 +96,30 @@ public class ApplicationTolerations {
    */
   public boolean exceedRetainDurationAtInstant(
       ApplicationState lastObservedState, Instant instant) {
-    return lastObservedState != null
+    return isRetainDurationEnabled()
+        && lastObservedState != null
         && lastObservedState.getCurrentStateSummary().isTerminated()
-        && resourceRetainDurationMillis > 0L
         && Instant.parse(lastObservedState.getLastTransitionTime())
-            .plusMillis(resourceRetainDurationMillis)
+            .plusMillis(computeEffectiveRetainDurationMillis())
             .isBefore(instant);
   }
 
   /**
    * Indicates whether the reconciler need to perform retain duration check
    *
-   * @return true `resourceRetainDurationMillis` is set to non-negative value
+   * @return true if `resourceRetainDurationMillis` or `ttlAfterStopMillis` is 
set to non-negative
+   *     value
    */
   public boolean isRetainDurationEnabled() {
-    return resourceRetainDurationMillis >= 0L;
+    return resourceRetainDurationMillis >= 0L || ttlAfterStopMillis >= 0L;
+  }
+
+  /**
+   * Indicates whether the reconciler need to perform ttl check
+   *
+   * @return true if `ttlAfterStopMillis` is set to non-negative value
+   */
+  public boolean isTTLEnabled() {
+    return ttlAfterStopMillis >= 0L;
   }
 }
diff --git 
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ApplicationTolerationsTest.java
 
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ApplicationTolerationsTest.java
new file mode 100644
index 0000000..bcc537f
--- /dev/null
+++ 
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ApplicationTolerationsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.spec;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+class ApplicationTolerationsTest {
+  private final ApplicationTolerations withRetainDurationOnly =
+      
ApplicationTolerations.builder().resourceRetainDurationMillis(10L).build();
+  private final ApplicationTolerations withTTLOnly =
+      ApplicationTolerations.builder().ttlAfterStopMillis(10L).build();
+  private final ApplicationTolerations withNeitherRetainDurationNorTtl =
+      ApplicationTolerations.builder().build();
+  private final ApplicationTolerations withRetainDurationGreaterThanTtl =
+      ApplicationTolerations.builder()
+          .resourceRetainDurationMillis(20L)
+          .ttlAfterStopMillis(10L)
+          .build();
+  private final ApplicationTolerations withRetainDurationShorterThanTtl =
+      ApplicationTolerations.builder()
+          .resourceRetainDurationMillis(10L)
+          .ttlAfterStopMillis(20L)
+          .build();
+
+  @Test
+  void computeEffectiveRetainDurationMillis() {
+    assertEquals(10L, 
withRetainDurationOnly.computeEffectiveRetainDurationMillis());
+    assertEquals(10L, withTTLOnly.computeEffectiveRetainDurationMillis());
+    assertEquals(-1, 
withNeitherRetainDurationNorTtl.computeEffectiveRetainDurationMillis());
+    assertEquals(10L, 
withRetainDurationGreaterThanTtl.computeEffectiveRetainDurationMillis());
+    assertEquals(10L, 
withRetainDurationShorterThanTtl.computeEffectiveRetainDurationMillis());
+  }
+
+  @Test
+  void isRetainDurationEnabled() {
+    assertTrue(withRetainDurationOnly.isRetainDurationEnabled());
+    assertTrue(withTTLOnly.isRetainDurationEnabled());
+    assertFalse(withNeitherRetainDurationNorTtl.isRetainDurationEnabled());
+    assertTrue(withRetainDurationGreaterThanTtl.isRetainDurationEnabled());
+    assertTrue(withRetainDurationShorterThanTtl.isRetainDurationEnabled());
+  }
+
+  @Test
+  void isTTLEnabled() {
+    assertFalse(withRetainDurationOnly.isTTLEnabled());
+    assertTrue(withTTLOnly.isTTLEnabled());
+    assertFalse(withNeitherRetainDurationNorTtl.isTTLEnabled());
+    assertTrue(withRetainDurationGreaterThanTtl.isTTLEnabled());
+    assertTrue(withRetainDurationShorterThanTtl.isTTLEnabled());
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
index c62a506..12b241f 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
@@ -30,6 +30,7 @@ import java.util.function.Supplier;
 
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -95,7 +96,7 @@ public class AppCleanUpStep extends AppReconcileStep {
     ApplicationTolerations tolerations = 
application.getSpec().getApplicationTolerations();
     if (currentState.getCurrentStateSummary().isTerminated()) {
       Optional<ReconcileProgress> terminatedAppProgress =
-          checkEarlyExitForTerminatedApp(application, statusRecorder);
+          checkEarlyExitForTerminatedApp(context.getClient(), application, 
statusRecorder);
       if (terminatedAppProgress.isPresent()) {
         return terminatedAppProgress.get();
       }
@@ -184,14 +185,45 @@ public class AppCleanUpStep extends AppReconcileStep {
     }
   }
 
-  protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
+  protected Optional<ReconcileProgress> 
clearCacheAndFinishReconcileForApplication(
       final SparkApplication application, final SparkAppStatusRecorder 
statusRecorder) {
+    log.debug("Cleaning up status cache and stop reconciling for 
application.");
+    statusRecorder.removeCachedStatus(application);
+    return Optional.of(ReconcileProgress.completeAndNoRequeue());
+  }
+
+  protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
+      final KubernetesClient client,
+      final SparkApplication application,
+      final SparkAppStatusRecorder statusRecorder) {
     ApplicationStatus currentStatus = application.getStatus();
     ApplicationState currentState = currentStatus.getCurrentState();
     ApplicationTolerations tolerations = 
application.getSpec().getApplicationTolerations();
+    Instant now = Instant.now();
     if 
(ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary()))
 {
-      statusRecorder.removeCachedStatus(application);
-      return Optional.of(ReconcileProgress.completeAndNoRequeue());
+      // Perform TTL check after removing all secondary resources, if enabled
+      if (isOnDemandCleanup() || !tolerations.isTTLEnabled()) {
+        // all secondary resources have been released, no more reconciliations 
needed
+        return clearCacheAndFinishReconcileForApplication(application, 
statusRecorder);
+      } else {
+        ApplicationState lastObservedStateBeforeTermination =
+            getLastObservedStateBeforeTermination(currentStatus);
+        Duration nextCheckDuration =
+            Duration.between(
+                now,
+                
Instant.parse(lastObservedStateBeforeTermination.getLastTransitionTime())
+                    .plusMillis(tolerations.getTtlAfterStopMillis()));
+        if (nextCheckDuration.isNegative()) {
+          log.info("Garbage collecting application exceeded given ttl.");
+          ReconcilerUtils.deleteResourceIfExists(client, application, true);
+          return clearCacheAndFinishReconcileForApplication(application, 
statusRecorder);
+        } else {
+          log.info(
+              "Application has yet expired, reconciliation would be resumed in 
{} millis.",
+              nextCheckDuration.toMillis());
+          return 
Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
+        }
+      }
     }
     if (isOnDemandCleanup()) {
       return Optional.empty();
@@ -199,21 +231,24 @@ public class AppCleanUpStep extends AppReconcileStep {
     if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
         currentState.getCurrentStateSummary())) {
       if (tolerations.isRetainDurationEnabled()) {
-        Instant now = Instant.now();
         if (tolerations.exceedRetainDurationAtInstant(currentState, now)) {
+          log.info("Garbage collecting secondary resources for application");
           onDemandCleanUpReason = 
SparkAppStatusUtils::appExceededRetainDuration;
           return Optional.empty();
         } else {
           Duration nextCheckDuration =
               Duration.between(
-                  Instant.now(),
+                  now,
                   Instant.parse(currentState.getLastTransitionTime())
-                      
.plusMillis(tolerations.getResourceRetainDurationMillis()));
+                      
.plusMillis(tolerations.computeEffectiveRetainDurationMillis()));
+          log.info(
+              "Application is within retention, reconciliation would be 
resumed in {} millis.",
+              nextCheckDuration.toMillis());
           return 
Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
         }
       } else {
-        statusRecorder.removeCachedStatus(application);
-        return Optional.of(ReconcileProgress.completeAndNoRequeue());
+        log.info("Retention duration check is not enabled for application.");
+        return clearCacheAndFinishReconcileForApplication(application, 
statusRecorder);
       }
     }
     return Optional.empty();
@@ -223,17 +258,25 @@ public class AppCleanUpStep extends AppReconcileStep {
     return onDemandCleanUpReason != null;
   }
 
-  protected boolean isReleasingResourcesForSchedulingFailureAttempt(
-      final ApplicationStatus status) {
+  /**
+   * @param status status of the application
+   * @return The last observed state before termination if the app has 
terminated. If the app has
+   *     not terminated, return the last observed state
+   */
+  protected ApplicationState getLastObservedStateBeforeTermination(final 
ApplicationStatus status) {
     ApplicationState lastObservedState = status.getCurrentState();
-    if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
-        lastObservedState.getCurrentStateSummary())) {
-      // if the app has already terminated, use the last observed state before 
termination
+    if (lastObservedState.getCurrentStateSummary().isTerminated()) {
       NavigableMap<Long, ApplicationState> navMap =
           (NavigableMap<Long, ApplicationState>) 
status.getStateTransitionHistory();
       Map.Entry<Long, ApplicationState> terminateState = navMap.lastEntry();
-      lastObservedState = 
navMap.lowerEntry(terminateState.getKey()).getValue();
+      return navMap.lowerEntry(terminateState.getKey()).getValue();
     }
+    return lastObservedState;
+  }
+
+  protected boolean isReleasingResourcesForSchedulingFailureAttempt(
+      final ApplicationStatus status) {
+    ApplicationState lastObservedState = 
getLastObservedStateBeforeTermination(status);
     return ApplicationStateSummary.SchedulingFailure.equals(
         lastObservedState.getCurrentStateSummary());
   }
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
index 23ed54b..1fbf563 100644
--- 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
@@ -81,6 +81,14 @@ class AppCleanUpStepTest {
                   .resourceRetainDurationMillis(1L)
                   .build())
           .build();
+  private final ApplicationSpec exceedRetainDurationFromTtl =
+      ApplicationSpec.builder()
+          .applicationTolerations(
+              ApplicationTolerations.builder()
+                  .resourceRetainPolicy(ResourceRetainPolicy.Always)
+                  .ttlAfterStopMillis(1L)
+                  .build())
+          .build();
   private final ApplicationSpec notExceedRetainDuration =
       ApplicationSpec.builder()
           .applicationTolerations(
@@ -89,9 +97,23 @@ class AppCleanUpStepTest {
                   .resourceRetainDurationMillis(24 * 60 * 60 * 1000L)
                   .build())
           .build();
+  private final ApplicationSpec notExceedTtl =
+      ApplicationSpec.builder()
+          .applicationTolerations(
+              ApplicationTolerations.builder()
+                  .resourceRetainPolicy(ResourceRetainPolicy.Always)
+                  .ttlAfterStopMillis(24 * 60 * 60 * 1000L)
+                  .build())
+          .build();
 
   private final List<ApplicationSpec> specs =
-      List.of(alwaysRetain, neverRetain, exceedRetainDuration, 
notExceedRetainDuration);
+      List.of(
+          alwaysRetain,
+          neverRetain,
+          exceedRetainDuration,
+          exceedRetainDurationFromTtl,
+          notExceedRetainDuration,
+          notExceedTtl);
 
   @Test
   void enableForceDelete() {
@@ -191,6 +213,7 @@ class AppCleanUpStepTest {
         verify(mockAppContext, times(1)).getResource();
         verify(mockApp, times(2)).getSpec();
         verify(mockApp, times(2)).getStatus();
+        verify(mockAppContext).getClient();
         verify(mockRecorder).removeCachedStatus(mockApp);
         verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp);
       }
@@ -214,6 +237,7 @@ class AppCleanUpStepTest {
     verify(mockApp, times(2)).getSpec();
     verify(mockApp, times(2)).getStatus();
     verify(mockRecorder).removeCachedStatus(mockApp);
+    verify(mockAppContext).getClient();
     verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp);
   }
 
@@ -246,7 +270,7 @@ class AppCleanUpStepTest {
     verify(mockAppContext, times(1)).getResource();
     verify(mockApp, times(3)).getSpec();
     verify(mockApp, times(3)).getStatus();
-    verify(mockAppContext).getClient();
+    verify(mockAppContext, times(2)).getClient();
     verify(mockAppContext).getDriverPod();
     ArgumentCaptor<ApplicationState> captor = 
ArgumentCaptor.forClass(ApplicationState.class);
     verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext), 
captor.capture());
@@ -337,7 +361,7 @@ class AppCleanUpStepTest {
   }
 
   @Test
-  void checkEarlyExitForResourceReleasedApp() {
+  void checkEarlyExitForResourceReleasedAppWithoutTTL() {
     AppCleanUpStep routineCheck = new AppCleanUpStep();
     AppCleanUpStep cleanUpWithReason = new 
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
     ApplicationStatus succeeded =
@@ -350,27 +374,112 @@ class AppCleanUpStepTest {
         prepareApplicationStatus(
             ApplicationStateSummary.ResourceReleased, 
ApplicationStateSummary.RunningHealthy);
     List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    List<ApplicationSpec> specList =
+        List.of(alwaysRetain, neverRetain, exceedRetainDuration, 
notExceedRetainDuration);
 
-    for (ApplicationSpec appSpec : specs) {
+    for (ApplicationSpec appSpec : specList) {
       for (ApplicationStatus appStatus : statusList) {
         SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
         SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
         SparkApplication mockApp = mock(SparkApplication.class);
         when(mockApp.getStatus()).thenReturn(appStatus);
         when(mockApp.getSpec()).thenReturn(appSpec);
-
         Optional<ReconcileProgress> routineCheckProgress =
-            routineCheck.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder1);
+            routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, 
mockRecorder1);
         assertTrue(routineCheckProgress.isPresent());
+        ReconcileProgress reconcileProgress = routineCheckProgress.get();
+        Optional<ReconcileProgress> onDemandProgress =
+            cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, 
mockApp, mockRecorder2);
+        Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), 
reconcileProgress);
+        verify(mockRecorder1).removeCachedStatus(mockApp);
+        assertTrue(onDemandProgress.isPresent());
         Assertions.assertEquals(
             ReconcileProgress.completeAndNoRequeue(), 
routineCheckProgress.get());
+        verify(mockRecorder2).removeCachedStatus(mockApp);
+      }
+    }
+  }
+
+  @Test
+  void checkEarlyExitForResourceReleasedAppWithExceededTTL() {
+    AppCleanUpStep routineCheck = new AppCleanUpStep();
+    AppCleanUpStep cleanUpWithReason = new 
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+    ApplicationStatus succeeded =
+        prepareApplicationStatus(
+            ApplicationStateSummary.ResourceReleased, 
ApplicationStateSummary.Succeeded);
+    ApplicationStatus failed =
+        prepareApplicationStatus(
+            ApplicationStateSummary.ResourceReleased, 
ApplicationStateSummary.SchedulingFailure);
+    ApplicationStatus cancelled =
+        prepareApplicationStatus(
+            ApplicationStateSummary.ResourceReleased, 
ApplicationStateSummary.Failed);
+    List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    for (ApplicationStatus appStatus : statusList) {
+      SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
+      SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
+      SparkApplication mockApp = mock(SparkApplication.class);
+      when(mockApp.getStatus()).thenReturn(appStatus);
+      when(mockApp.getSpec()).thenReturn(exceedRetainDurationFromTtl);
+      try (MockedStatic<ReconcilerUtils> utils = 
Mockito.mockStatic(ReconcilerUtils.class)) {
+        Optional<ReconcileProgress> routineCheckProgress =
+            routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, 
mockRecorder1);
+        assertTrue(routineCheckProgress.isPresent());
+        ReconcileProgress reconcileProgress = routineCheckProgress.get();
+        assertTrue(reconcileProgress.isCompleted());
+        assertFalse(reconcileProgress.isRequeue());
+        utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, 
mockApp, true));
+        verify(mockRecorder1).removeCachedStatus(mockApp);
+        Optional<ReconcileProgress> onDemandProgress =
+            cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, 
mockApp, mockRecorder2);
         verify(mockRecorder1).removeCachedStatus(mockApp);
+        assertTrue(onDemandProgress.isPresent());
+        ReconcileProgress reconcileProgressOnDemand = onDemandProgress.get();
+        Assertions.assertEquals(
+            ReconcileProgress.completeAndNoRequeue(), 
reconcileProgressOnDemand);
+        verify(mockRecorder2).removeCachedStatus(mockApp);
+      }
+    }
+  }
 
+  @Test
+  void checkEarlyExitForResourceReleasedAppWithinTTL() {
+    AppCleanUpStep routineCheck = new AppCleanUpStep();
+    AppCleanUpStep cleanUpWithReason = new 
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+    ApplicationStatus succeeded =
+        prepareApplicationStatus(
+            ApplicationStateSummary.ResourceReleased, 
ApplicationStateSummary.Succeeded);
+    ApplicationStatus failed =
+        prepareApplicationStatus(
+            ApplicationStateSummary.ResourceReleased, 
ApplicationStateSummary.SchedulingFailure);
+    ApplicationStatus cancelled =
+        prepareApplicationStatus(
+            ApplicationStateSummary.ResourceReleased, 
ApplicationStateSummary.Failed);
+    List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    for (ApplicationStatus appStatus : statusList) {
+      SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
+      SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
+      SparkApplication mockApp = mock(SparkApplication.class);
+      when(mockApp.getStatus()).thenReturn(appStatus);
+      when(mockApp.getSpec()).thenReturn(notExceedTtl);
+      try (MockedStatic<ReconcilerUtils> utils = 
Mockito.mockStatic(ReconcilerUtils.class)) {
+        Optional<ReconcileProgress> routineCheckProgress =
+            routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, 
mockRecorder1);
+        assertTrue(routineCheckProgress.isPresent());
+        ReconcileProgress reconcileProgress = routineCheckProgress.get();
+        assertTrue(reconcileProgress.isCompleted());
+        assertTrue(reconcileProgress.isRequeue());
+        assertTrue(reconcileProgress.getRequeueAfterDuration().toMillis() > 0);
+        utils.verifyNoInteractions();
+        verifyNoMoreInteractions(mockRecorder1);
         Optional<ReconcileProgress> onDemandProgress =
-            cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder2);
+            cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, 
mockApp, mockRecorder2);
         assertTrue(onDemandProgress.isPresent());
+        ReconcileProgress reconcileProgressOnDemand = onDemandProgress.get();
         Assertions.assertEquals(
-            ReconcileProgress.completeAndNoRequeue(), 
routineCheckProgress.get());
+            ReconcileProgress.completeAndNoRequeue(), 
reconcileProgressOnDemand);
         verify(mockRecorder2).removeCachedStatus(mockApp);
       }
     }
@@ -393,6 +502,7 @@ class AppCleanUpStepTest {
             ApplicationStateSummary.TerminatedWithoutReleaseResources,
             ApplicationStateSummary.RunningHealthy);
     List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+    KubernetesClient mockClient = mock(KubernetesClient.class);
 
     for (ApplicationStatus appStatus : statusList) {
       SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
@@ -402,13 +512,13 @@ class AppCleanUpStepTest {
       when(mockApp.getSpec()).thenReturn(alwaysRetain);
 
       Optional<ReconcileProgress> routineCheckProgress =
-          routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1);
+          routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, 
mockRecorder1);
       assertTrue(routineCheckProgress.isPresent());
       Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), 
routineCheckProgress.get());
       verify(mockRecorder1).removeCachedStatus(mockApp);
 
       Optional<ReconcileProgress> onDemandProgress =
-          cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder2);
+          cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, 
mockApp, mockRecorder2);
       assertFalse(onDemandProgress.isPresent());
       verifyNoMoreInteractions(mockRecorder2);
     }
@@ -431,23 +541,27 @@ class AppCleanUpStepTest {
             ApplicationStateSummary.TerminatedWithoutReleaseResources,
             ApplicationStateSummary.RunningHealthy);
     List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    List<ApplicationSpec> specs = List.of(exceedRetainDuration, 
exceedRetainDurationFromTtl);
 
-    for (ApplicationStatus appStatus : statusList) {
-      SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
-      SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
-      SparkApplication mockApp = mock(SparkApplication.class);
-      when(mockApp.getStatus()).thenReturn(appStatus);
-      when(mockApp.getSpec()).thenReturn(exceedRetainDuration);
+    for (ApplicationSpec spec : specs) {
+      for (ApplicationStatus appStatus : statusList) {
+        SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
+        SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
+        SparkApplication mockApp = mock(SparkApplication.class);
+        when(mockApp.getStatus()).thenReturn(appStatus);
+        when(mockApp.getSpec()).thenReturn(spec);
 
-      Optional<ReconcileProgress> routineCheckProgress =
-          routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1);
-      assertFalse(routineCheckProgress.isPresent());
-      verifyNoMoreInteractions(mockRecorder1);
+        Optional<ReconcileProgress> routineCheckProgress =
+            routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, 
mockRecorder1);
+        assertFalse(routineCheckProgress.isPresent());
+        verifyNoMoreInteractions(mockRecorder1, mockClient);
 
-      Optional<ReconcileProgress> onDemandProgress =
-          cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder2);
-      assertFalse(onDemandProgress.isPresent());
-      verifyNoMoreInteractions(mockRecorder2);
+        Optional<ReconcileProgress> onDemandProgress =
+            cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, 
mockApp, mockRecorder2);
+        assertFalse(onDemandProgress.isPresent());
+        verifyNoMoreInteractions(mockRecorder2, mockClient);
+      }
     }
   }
 
@@ -468,6 +582,7 @@ class AppCleanUpStepTest {
             ApplicationStateSummary.TerminatedWithoutReleaseResources,
             ApplicationStateSummary.RunningHealthy);
     List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+    KubernetesClient mockClient = mock(KubernetesClient.class);
 
     for (ApplicationStatus appStatus : statusList) {
       SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
@@ -477,17 +592,17 @@ class AppCleanUpStepTest {
       when(mockApp.getSpec()).thenReturn(notExceedRetainDuration);
 
       Optional<ReconcileProgress> routineCheckProgress =
-          routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1);
+          routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, 
mockRecorder1);
       assertTrue(routineCheckProgress.isPresent());
       ReconcileProgress reconcileProgress = routineCheckProgress.get();
       assertTrue(reconcileProgress.isCompleted());
       assertTrue(reconcileProgress.isRequeue());
-      verifyNoMoreInteractions(mockRecorder2);
+      verifyNoMoreInteractions(mockRecorder2, mockClient);
 
       Optional<ReconcileProgress> onDemandProgress =
-          cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder2);
+          cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, 
mockApp, mockRecorder2);
       assertFalse(onDemandProgress.isPresent());
-      verifyNoMoreInteractions(mockRecorder2);
+      verifyNoMoreInteractions(mockRecorder2, mockClient);
     }
   }
 
@@ -500,6 +615,7 @@ class AppCleanUpStepTest {
         continue;
       }
       ApplicationStatus status = prepareApplicationStatus(stateSummary);
+      KubernetesClient mockClient = mock(KubernetesClient.class);
       for (ApplicationSpec appSpec : specs) {
         SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
         SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
@@ -508,14 +624,14 @@ class AppCleanUpStepTest {
         when(mockApp.getSpec()).thenReturn(appSpec);
 
         Optional<ReconcileProgress> routineCheckProgress =
-            routineCheck.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder1);
+            routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, 
mockRecorder1);
         assertTrue(routineCheckProgress.isEmpty());
-        verifyNoMoreInteractions(mockRecorder1);
+        verifyNoMoreInteractions(mockRecorder1, mockClient);
 
         Optional<ReconcileProgress> onDemandProgress =
-            cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder2);
+            cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, 
mockApp, mockRecorder2);
         assertTrue(onDemandProgress.isEmpty());
-        verifyNoMoreInteractions(mockRecorder2);
+        verifyNoMoreInteractions(mockRecorder2, mockClient);
       }
     }
   }
diff --git a/tests/e2e/resource-retain-duration/chainsaw-test.yaml 
b/tests/e2e/resource-retain-duration/chainsaw-test.yaml
index 2c8b558..57c5dc6 100644
--- a/tests/e2e/resource-retain-duration/chainsaw-test.yaml
+++ b/tests/e2e/resource-retain-duration/chainsaw-test.yaml
@@ -41,6 +41,14 @@ spec:
             value: default
         timeout: 120s
         file: 
"../assertions/spark-application/spark-state-transition-with-retain-check.yaml"
+    - wait:
+        apiVersion: spark.apache.org/v1
+        kind: SparkApplication
+        namespace: default
+        name: ($SPARK_APPLICATION_NAME)
+        for:
+          deletion: {}
+        timeout: 60s
     catch:
     - describe:
         apiVersion: spark.apache.org/v1
@@ -53,4 +61,4 @@ spec:
             value: ($SPARK_APPLICATION_NAME)
         timeout: 120s
         content: |
-          kubectl delete sparkapplication $SPARK_APPLICATION_NAME
+          kubectl delete sparkapplication $SPARK_APPLICATION_NAME 
--ignore-not-found=true 
diff --git 
a/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml 
b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml
index 022fdd4..952bfff 100644
--- a/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml
+++ b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml
@@ -26,6 +26,7 @@ spec:
   applicationTolerations:
     resourceRetainPolicy: Always
     resourceRetainDurationMillis: 10000
+    ttlAfterStopMillis: 30000
   sparkConf:
     spark.executor.instances: "1"
     spark.kubernetes.container.image: "apache/spark:4.0.0-java21-scala"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to