This is an automated email from the ASF dual-hosted git repository. ptoth pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new b89c5cc [SPARK-52915] Support TTL for Spark apps b89c5cc is described below commit b89c5cc5dd880fdb65a93a3f14817ce9141227a6 Author: Zhou JIANG <jiang...@umich.edu> AuthorDate: Thu Aug 14 10:41:18 2025 +0200 [SPARK-52915] Support TTL for Spark apps ### What changes were proposed in this pull request? This PR adds support for configuring the ttl for Spark apps after it stops. Working with the `resourceRetainPolicy` and `resourceRetainDurationMillis`, it enhances the garbage collection mechanism at the custom resource level. ### Why are the changes needed? Introducing TTL helps user to more effectively configure the garbage collection for apps. ### Does this PR introduce _any_ user-facing change? New configurable field spec.applicationTolerations.ttlAfterStopMillis added to SparkApplication CRD ### How was this patch tested? CIs - including new unit test and revised e2e scenario ### Was this patch authored or co-authored using generative AI tooling? No Closes #290 from jiangzho/resource_ttl. Authored-by: Zhou JIANG <jiang...@umich.edu> Signed-off-by: Peter Toth <p_t...@apple.com> --- .../sparkapplications.spark.apache.org-v1.yaml | 3 + docs/spark_custom_resources.md | 51 +++++- .../k8s/operator/spec/ApplicationTolerations.java | 48 +++++- .../operator/spec/ApplicationTolerationsTest.java | 70 ++++++++ .../reconciler/reconcilesteps/AppCleanUpStep.java | 73 +++++++-- .../reconcilesteps/AppCleanUpStepTest.java | 180 +++++++++++++++++---- .../resource-retain-duration/chainsaw-test.yaml | 10 +- .../spark-example-retain-duration.yaml | 1 + 8 files changed, 382 insertions(+), 54 deletions(-) diff --git a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml index 21d3996..6b2ee7b 100644 --- a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml +++ b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml @@ -17434,6 +17434,9 @@ spec: - OnInfrastructureFailure type: string type: object + ttlAfterStopMillis: + default: -1 + type: integer type: object configMapSpecs: items: diff --git a/docs/spark_custom_resources.md b/docs/spark_custom_resources.md index ad6f8b9..1f39ba5 100644 --- a/docs/spark_custom_resources.md +++ b/docs/spark_custom_resources.md @@ -293,6 +293,8 @@ applicationTolerations: resourceRetainPolicy: OnFailure # Secondary resources would be garbage collected 10 minutes after app termination resourceRetainDurationMillis: 600000 + # Garbage collect the SparkApplication custom resource itself 30 minutes after termination + ttlAfterStopMillis: 1800000 ``` to avoid operator attempt to delete driver pod and driver resources if app fails. Similarly, @@ -302,7 +304,54 @@ possible to configure `resourceRetainDurationMillis` to define the maximal retai these resources. Note that this applies only to operator-created resources (driver pod, SparkConf configmap .etc). You may also want to tune `spark.kubernetes.driver.service.deleteOnTermination` and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of driver-created -resources. +resources. `ttlAfterStopMillis` controls the garbage collection behavior at the SparkApplication +level after it stops. When set to a non-negative value, Spark operator would garbage collect the +application (and therefore all its associated resources) after given timeout. If the application +is configured to restart, `resourceRetainPolicy`, `resourceRetainDurationMillis` and +`ttlAfterStopMillis` would be applied only to the last attempt. + +For example, if an app with below configuration: + +```yaml +applicationTolerations: + restartConfig: + restartPolicy: OnFailure + maxRestartAttempts: 1 + resourceRetainPolicy: Always + resourceRetainDurationMillis: 30000 + ttlAfterStopMillis: 60000 +``` + +ends up with status like: + +```yaml +status: +#... the 1st attempt + "5": + currentStateSummary: Failed + "6": + currentStateSummary: ScheduledToRestart +# ...the 2nd attempt + "11": + currentStateSummary: Succeeded + "12": + currentStateSummary: TerminatedWithoutReleaseResources +``` + +The retain policy only takes effect after the final state `12`. Secondary resources are always +released between attempts between `5` and `6`. TTL would be calculated based on the last state as +well. + +| Field | Type | Default Value | Description | +|-----------------------------------------------------------|-----------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| .spec.applicationTolerations.resourceRetainPolicy | `Always` / `OnFailure` / `Never` | Never | Configure operator to delete / retain secondary resources for an app after it terminates. | +| .spec.applicationTolerations.resourceRetainDurationMillis | integer | -1 | Time to wait in milliseconds for releasing **secondary resources** after termination. Setting to negative value would disable the retention duration check for secondary resources after termination. | +| .spec.applicationTolerations.ttlAfterStopMillis | integer | -1 | Time-to-live in milliseconds for SparkApplication and **all its associated secondary resources**. If set to a negative value, the application would be retained and not be garbage collected by operator. | + +Note that `ttlAfterStopMillis` applies to the app as well as its secondary resources. If both +`resourceRetainDurationMillis` and `ttlAfterStopMillis` are set to non-negative value and the +latter is smaller, then it takes higher precedence: operator would remove all resources related +to this app after `ttlAfterStopMillis`. ## Spark Cluster diff --git a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java index 8e47817..20b8122 100644 --- a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java +++ b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java @@ -58,6 +58,34 @@ public class ApplicationTolerations { @Builder.Default protected Long resourceRetainDurationMillis = -1L; + /** + * Time-to-live in milliseconds for SparkApplication and all its associated secondary resources + * after stop. If set to a negative value, the application could be retained according to the + * retain policy. If the application is configured to restart, this would apply to the last + * attempt only. + */ + @Default("-1") + @Builder.Default + protected Long ttlAfterStopMillis = -1L; + + /** + * @return The effective retain duration for secondary resources, which would be the smaller value + * of `resourceRetainDurationMillis` or `ttlAfterStopMillis`, if they are set to non-negative + * value. Return -1 if none of them are set. + */ + public long computeEffectiveRetainDurationMillis() { + if (resourceRetainDurationMillis < 0 && ttlAfterStopMillis < 0) { + return -1L; + } + if (resourceRetainDurationMillis < 0) { + return ttlAfterStopMillis; + } + if (ttlAfterStopMillis < 0) { + return resourceRetainDurationMillis; + } + return Math.min(resourceRetainDurationMillis, ttlAfterStopMillis); + } + /** * Check whether a terminated application has exceeded the resource retain duration at the * provided instant @@ -68,20 +96,30 @@ public class ApplicationTolerations { */ public boolean exceedRetainDurationAtInstant( ApplicationState lastObservedState, Instant instant) { - return lastObservedState != null + return isRetainDurationEnabled() + && lastObservedState != null && lastObservedState.getCurrentStateSummary().isTerminated() - && resourceRetainDurationMillis > 0L && Instant.parse(lastObservedState.getLastTransitionTime()) - .plusMillis(resourceRetainDurationMillis) + .plusMillis(computeEffectiveRetainDurationMillis()) .isBefore(instant); } /** * Indicates whether the reconciler need to perform retain duration check * - * @return true `resourceRetainDurationMillis` is set to non-negative value + * @return true if `resourceRetainDurationMillis` or `ttlAfterStopMillis` is set to non-negative + * value */ public boolean isRetainDurationEnabled() { - return resourceRetainDurationMillis >= 0L; + return resourceRetainDurationMillis >= 0L || ttlAfterStopMillis >= 0L; + } + + /** + * Indicates whether the reconciler need to perform ttl check + * + * @return true if `ttlAfterStopMillis` is set to non-negative value + */ + public boolean isTTLEnabled() { + return ttlAfterStopMillis >= 0L; } } diff --git a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ApplicationTolerationsTest.java b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ApplicationTolerationsTest.java new file mode 100644 index 0000000..bcc537f --- /dev/null +++ b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ApplicationTolerationsTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator.spec; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +class ApplicationTolerationsTest { + private final ApplicationTolerations withRetainDurationOnly = + ApplicationTolerations.builder().resourceRetainDurationMillis(10L).build(); + private final ApplicationTolerations withTTLOnly = + ApplicationTolerations.builder().ttlAfterStopMillis(10L).build(); + private final ApplicationTolerations withNeitherRetainDurationNorTtl = + ApplicationTolerations.builder().build(); + private final ApplicationTolerations withRetainDurationGreaterThanTtl = + ApplicationTolerations.builder() + .resourceRetainDurationMillis(20L) + .ttlAfterStopMillis(10L) + .build(); + private final ApplicationTolerations withRetainDurationShorterThanTtl = + ApplicationTolerations.builder() + .resourceRetainDurationMillis(10L) + .ttlAfterStopMillis(20L) + .build(); + + @Test + void computeEffectiveRetainDurationMillis() { + assertEquals(10L, withRetainDurationOnly.computeEffectiveRetainDurationMillis()); + assertEquals(10L, withTTLOnly.computeEffectiveRetainDurationMillis()); + assertEquals(-1, withNeitherRetainDurationNorTtl.computeEffectiveRetainDurationMillis()); + assertEquals(10L, withRetainDurationGreaterThanTtl.computeEffectiveRetainDurationMillis()); + assertEquals(10L, withRetainDurationShorterThanTtl.computeEffectiveRetainDurationMillis()); + } + + @Test + void isRetainDurationEnabled() { + assertTrue(withRetainDurationOnly.isRetainDurationEnabled()); + assertTrue(withTTLOnly.isRetainDurationEnabled()); + assertFalse(withNeitherRetainDurationNorTtl.isRetainDurationEnabled()); + assertTrue(withRetainDurationGreaterThanTtl.isRetainDurationEnabled()); + assertTrue(withRetainDurationShorterThanTtl.isRetainDurationEnabled()); + } + + @Test + void isTTLEnabled() { + assertFalse(withRetainDurationOnly.isTTLEnabled()); + assertTrue(withTTLOnly.isTTLEnabled()); + assertFalse(withNeitherRetainDurationNorTtl.isTTLEnabled()); + assertTrue(withRetainDurationGreaterThanTtl.isTTLEnabled()); + assertTrue(withRetainDurationShorterThanTtl.isTTLEnabled()); + } +} diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java index c62a506..12b241f 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java @@ -30,6 +30,7 @@ import java.util.function.Supplier; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -95,7 +96,7 @@ public class AppCleanUpStep extends AppReconcileStep { ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations(); if (currentState.getCurrentStateSummary().isTerminated()) { Optional<ReconcileProgress> terminatedAppProgress = - checkEarlyExitForTerminatedApp(application, statusRecorder); + checkEarlyExitForTerminatedApp(context.getClient(), application, statusRecorder); if (terminatedAppProgress.isPresent()) { return terminatedAppProgress.get(); } @@ -184,14 +185,45 @@ public class AppCleanUpStep extends AppReconcileStep { } } - protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp( + protected Optional<ReconcileProgress> clearCacheAndFinishReconcileForApplication( final SparkApplication application, final SparkAppStatusRecorder statusRecorder) { + log.debug("Cleaning up status cache and stop reconciling for application."); + statusRecorder.removeCachedStatus(application); + return Optional.of(ReconcileProgress.completeAndNoRequeue()); + } + + protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp( + final KubernetesClient client, + final SparkApplication application, + final SparkAppStatusRecorder statusRecorder) { ApplicationStatus currentStatus = application.getStatus(); ApplicationState currentState = currentStatus.getCurrentState(); ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations(); + Instant now = Instant.now(); if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) { - statusRecorder.removeCachedStatus(application); - return Optional.of(ReconcileProgress.completeAndNoRequeue()); + // Perform TTL check after removing all secondary resources, if enabled + if (isOnDemandCleanup() || !tolerations.isTTLEnabled()) { + // all secondary resources have been released, no more reconciliations needed + return clearCacheAndFinishReconcileForApplication(application, statusRecorder); + } else { + ApplicationState lastObservedStateBeforeTermination = + getLastObservedStateBeforeTermination(currentStatus); + Duration nextCheckDuration = + Duration.between( + now, + Instant.parse(lastObservedStateBeforeTermination.getLastTransitionTime()) + .plusMillis(tolerations.getTtlAfterStopMillis())); + if (nextCheckDuration.isNegative()) { + log.info("Garbage collecting application exceeded given ttl."); + ReconcilerUtils.deleteResourceIfExists(client, application, true); + return clearCacheAndFinishReconcileForApplication(application, statusRecorder); + } else { + log.info( + "Application has yet expired, reconciliation would be resumed in {} millis.", + nextCheckDuration.toMillis()); + return Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration)); + } + } } if (isOnDemandCleanup()) { return Optional.empty(); @@ -199,21 +231,24 @@ public class AppCleanUpStep extends AppReconcileStep { if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals( currentState.getCurrentStateSummary())) { if (tolerations.isRetainDurationEnabled()) { - Instant now = Instant.now(); if (tolerations.exceedRetainDurationAtInstant(currentState, now)) { + log.info("Garbage collecting secondary resources for application"); onDemandCleanUpReason = SparkAppStatusUtils::appExceededRetainDuration; return Optional.empty(); } else { Duration nextCheckDuration = Duration.between( - Instant.now(), + now, Instant.parse(currentState.getLastTransitionTime()) - .plusMillis(tolerations.getResourceRetainDurationMillis())); + .plusMillis(tolerations.computeEffectiveRetainDurationMillis())); + log.info( + "Application is within retention, reconciliation would be resumed in {} millis.", + nextCheckDuration.toMillis()); return Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration)); } } else { - statusRecorder.removeCachedStatus(application); - return Optional.of(ReconcileProgress.completeAndNoRequeue()); + log.info("Retention duration check is not enabled for application."); + return clearCacheAndFinishReconcileForApplication(application, statusRecorder); } } return Optional.empty(); @@ -223,17 +258,25 @@ public class AppCleanUpStep extends AppReconcileStep { return onDemandCleanUpReason != null; } - protected boolean isReleasingResourcesForSchedulingFailureAttempt( - final ApplicationStatus status) { + /** + * @param status status of the application + * @return The last observed state before termination if the app has terminated. If the app has + * not terminated, return the last observed state + */ + protected ApplicationState getLastObservedStateBeforeTermination(final ApplicationStatus status) { ApplicationState lastObservedState = status.getCurrentState(); - if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals( - lastObservedState.getCurrentStateSummary())) { - // if the app has already terminated, use the last observed state before termination + if (lastObservedState.getCurrentStateSummary().isTerminated()) { NavigableMap<Long, ApplicationState> navMap = (NavigableMap<Long, ApplicationState>) status.getStateTransitionHistory(); Map.Entry<Long, ApplicationState> terminateState = navMap.lastEntry(); - lastObservedState = navMap.lowerEntry(terminateState.getKey()).getValue(); + return navMap.lowerEntry(terminateState.getKey()).getValue(); } + return lastObservedState; + } + + protected boolean isReleasingResourcesForSchedulingFailureAttempt( + final ApplicationStatus status) { + ApplicationState lastObservedState = getLastObservedStateBeforeTermination(status); return ApplicationStateSummary.SchedulingFailure.equals( lastObservedState.getCurrentStateSummary()); } diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java index 23ed54b..1fbf563 100644 --- a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java @@ -81,6 +81,14 @@ class AppCleanUpStepTest { .resourceRetainDurationMillis(1L) .build()) .build(); + private final ApplicationSpec exceedRetainDurationFromTtl = + ApplicationSpec.builder() + .applicationTolerations( + ApplicationTolerations.builder() + .resourceRetainPolicy(ResourceRetainPolicy.Always) + .ttlAfterStopMillis(1L) + .build()) + .build(); private final ApplicationSpec notExceedRetainDuration = ApplicationSpec.builder() .applicationTolerations( @@ -89,9 +97,23 @@ class AppCleanUpStepTest { .resourceRetainDurationMillis(24 * 60 * 60 * 1000L) .build()) .build(); + private final ApplicationSpec notExceedTtl = + ApplicationSpec.builder() + .applicationTolerations( + ApplicationTolerations.builder() + .resourceRetainPolicy(ResourceRetainPolicy.Always) + .ttlAfterStopMillis(24 * 60 * 60 * 1000L) + .build()) + .build(); private final List<ApplicationSpec> specs = - List.of(alwaysRetain, neverRetain, exceedRetainDuration, notExceedRetainDuration); + List.of( + alwaysRetain, + neverRetain, + exceedRetainDuration, + exceedRetainDurationFromTtl, + notExceedRetainDuration, + notExceedTtl); @Test void enableForceDelete() { @@ -191,6 +213,7 @@ class AppCleanUpStepTest { verify(mockAppContext, times(1)).getResource(); verify(mockApp, times(2)).getSpec(); verify(mockApp, times(2)).getStatus(); + verify(mockAppContext).getClient(); verify(mockRecorder).removeCachedStatus(mockApp); verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp); } @@ -214,6 +237,7 @@ class AppCleanUpStepTest { verify(mockApp, times(2)).getSpec(); verify(mockApp, times(2)).getStatus(); verify(mockRecorder).removeCachedStatus(mockApp); + verify(mockAppContext).getClient(); verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp); } @@ -246,7 +270,7 @@ class AppCleanUpStepTest { verify(mockAppContext, times(1)).getResource(); verify(mockApp, times(3)).getSpec(); verify(mockApp, times(3)).getStatus(); - verify(mockAppContext).getClient(); + verify(mockAppContext, times(2)).getClient(); verify(mockAppContext).getDriverPod(); ArgumentCaptor<ApplicationState> captor = ArgumentCaptor.forClass(ApplicationState.class); verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext), captor.capture()); @@ -337,7 +361,7 @@ class AppCleanUpStepTest { } @Test - void checkEarlyExitForResourceReleasedApp() { + void checkEarlyExitForResourceReleasedAppWithoutTTL() { AppCleanUpStep routineCheck = new AppCleanUpStep(); AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); ApplicationStatus succeeded = @@ -350,27 +374,112 @@ class AppCleanUpStepTest { prepareApplicationStatus( ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.RunningHealthy); List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled); + KubernetesClient mockClient = mock(KubernetesClient.class); + List<ApplicationSpec> specList = + List.of(alwaysRetain, neverRetain, exceedRetainDuration, notExceedRetainDuration); - for (ApplicationSpec appSpec : specs) { + for (ApplicationSpec appSpec : specList) { for (ApplicationStatus appStatus : statusList) { SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); SparkApplication mockApp = mock(SparkApplication.class); when(mockApp.getStatus()).thenReturn(appStatus); when(mockApp.getSpec()).thenReturn(appSpec); - Optional<ReconcileProgress> routineCheckProgress = - routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder1); assertTrue(routineCheckProgress.isPresent()); + ReconcileProgress reconcileProgress = routineCheckProgress.get(); + Optional<ReconcileProgress> onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder2); + Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), reconcileProgress); + verify(mockRecorder1).removeCachedStatus(mockApp); + assertTrue(onDemandProgress.isPresent()); Assertions.assertEquals( ReconcileProgress.completeAndNoRequeue(), routineCheckProgress.get()); + verify(mockRecorder2).removeCachedStatus(mockApp); + } + } + } + + @Test + void checkEarlyExitForResourceReleasedAppWithExceededTTL() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus succeeded = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.Succeeded); + ApplicationStatus failed = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.SchedulingFailure); + ApplicationStatus cancelled = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.Failed); + List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled); + KubernetesClient mockClient = mock(KubernetesClient.class); + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(exceedRetainDurationFromTtl); + try (MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) { + Optional<ReconcileProgress> routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder1); + assertTrue(routineCheckProgress.isPresent()); + ReconcileProgress reconcileProgress = routineCheckProgress.get(); + assertTrue(reconcileProgress.isCompleted()); + assertFalse(reconcileProgress.isRequeue()); + utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, mockApp, true)); + verify(mockRecorder1).removeCachedStatus(mockApp); + Optional<ReconcileProgress> onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder2); verify(mockRecorder1).removeCachedStatus(mockApp); + assertTrue(onDemandProgress.isPresent()); + ReconcileProgress reconcileProgressOnDemand = onDemandProgress.get(); + Assertions.assertEquals( + ReconcileProgress.completeAndNoRequeue(), reconcileProgressOnDemand); + verify(mockRecorder2).removeCachedStatus(mockApp); + } + } + } + @Test + void checkEarlyExitForResourceReleasedAppWithinTTL() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus succeeded = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.Succeeded); + ApplicationStatus failed = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.SchedulingFailure); + ApplicationStatus cancelled = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.Failed); + List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled); + KubernetesClient mockClient = mock(KubernetesClient.class); + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(notExceedTtl); + try (MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) { + Optional<ReconcileProgress> routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder1); + assertTrue(routineCheckProgress.isPresent()); + ReconcileProgress reconcileProgress = routineCheckProgress.get(); + assertTrue(reconcileProgress.isCompleted()); + assertTrue(reconcileProgress.isRequeue()); + assertTrue(reconcileProgress.getRequeueAfterDuration().toMillis() > 0); + utils.verifyNoInteractions(); + verifyNoMoreInteractions(mockRecorder1); Optional<ReconcileProgress> onDemandProgress = - cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder2); assertTrue(onDemandProgress.isPresent()); + ReconcileProgress reconcileProgressOnDemand = onDemandProgress.get(); Assertions.assertEquals( - ReconcileProgress.completeAndNoRequeue(), routineCheckProgress.get()); + ReconcileProgress.completeAndNoRequeue(), reconcileProgressOnDemand); verify(mockRecorder2).removeCachedStatus(mockApp); } } @@ -393,6 +502,7 @@ class AppCleanUpStepTest { ApplicationStateSummary.TerminatedWithoutReleaseResources, ApplicationStateSummary.RunningHealthy); List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled); + KubernetesClient mockClient = mock(KubernetesClient.class); for (ApplicationStatus appStatus : statusList) { SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); @@ -402,13 +512,13 @@ class AppCleanUpStepTest { when(mockApp.getSpec()).thenReturn(alwaysRetain); Optional<ReconcileProgress> routineCheckProgress = - routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder1); assertTrue(routineCheckProgress.isPresent()); Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), routineCheckProgress.get()); verify(mockRecorder1).removeCachedStatus(mockApp); Optional<ReconcileProgress> onDemandProgress = - cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder2); assertFalse(onDemandProgress.isPresent()); verifyNoMoreInteractions(mockRecorder2); } @@ -431,23 +541,27 @@ class AppCleanUpStepTest { ApplicationStateSummary.TerminatedWithoutReleaseResources, ApplicationStateSummary.RunningHealthy); List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled); + KubernetesClient mockClient = mock(KubernetesClient.class); + List<ApplicationSpec> specs = List.of(exceedRetainDuration, exceedRetainDurationFromTtl); - for (ApplicationStatus appStatus : statusList) { - SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); - SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); - SparkApplication mockApp = mock(SparkApplication.class); - when(mockApp.getStatus()).thenReturn(appStatus); - when(mockApp.getSpec()).thenReturn(exceedRetainDuration); + for (ApplicationSpec spec : specs) { + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(spec); - Optional<ReconcileProgress> routineCheckProgress = - routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); - assertFalse(routineCheckProgress.isPresent()); - verifyNoMoreInteractions(mockRecorder1); + Optional<ReconcileProgress> routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder1); + assertFalse(routineCheckProgress.isPresent()); + verifyNoMoreInteractions(mockRecorder1, mockClient); - Optional<ReconcileProgress> onDemandProgress = - cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); - assertFalse(onDemandProgress.isPresent()); - verifyNoMoreInteractions(mockRecorder2); + Optional<ReconcileProgress> onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder2); + assertFalse(onDemandProgress.isPresent()); + verifyNoMoreInteractions(mockRecorder2, mockClient); + } } } @@ -468,6 +582,7 @@ class AppCleanUpStepTest { ApplicationStateSummary.TerminatedWithoutReleaseResources, ApplicationStateSummary.RunningHealthy); List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled); + KubernetesClient mockClient = mock(KubernetesClient.class); for (ApplicationStatus appStatus : statusList) { SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); @@ -477,17 +592,17 @@ class AppCleanUpStepTest { when(mockApp.getSpec()).thenReturn(notExceedRetainDuration); Optional<ReconcileProgress> routineCheckProgress = - routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder1); assertTrue(routineCheckProgress.isPresent()); ReconcileProgress reconcileProgress = routineCheckProgress.get(); assertTrue(reconcileProgress.isCompleted()); assertTrue(reconcileProgress.isRequeue()); - verifyNoMoreInteractions(mockRecorder2); + verifyNoMoreInteractions(mockRecorder2, mockClient); Optional<ReconcileProgress> onDemandProgress = - cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder2); assertFalse(onDemandProgress.isPresent()); - verifyNoMoreInteractions(mockRecorder2); + verifyNoMoreInteractions(mockRecorder2, mockClient); } } @@ -500,6 +615,7 @@ class AppCleanUpStepTest { continue; } ApplicationStatus status = prepareApplicationStatus(stateSummary); + KubernetesClient mockClient = mock(KubernetesClient.class); for (ApplicationSpec appSpec : specs) { SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); @@ -508,14 +624,14 @@ class AppCleanUpStepTest { when(mockApp.getSpec()).thenReturn(appSpec); Optional<ReconcileProgress> routineCheckProgress = - routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + routineCheck.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder1); assertTrue(routineCheckProgress.isEmpty()); - verifyNoMoreInteractions(mockRecorder1); + verifyNoMoreInteractions(mockRecorder1, mockClient); Optional<ReconcileProgress> onDemandProgress = - cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockClient, mockApp, mockRecorder2); assertTrue(onDemandProgress.isEmpty()); - verifyNoMoreInteractions(mockRecorder2); + verifyNoMoreInteractions(mockRecorder2, mockClient); } } } diff --git a/tests/e2e/resource-retain-duration/chainsaw-test.yaml b/tests/e2e/resource-retain-duration/chainsaw-test.yaml index 2c8b558..57c5dc6 100644 --- a/tests/e2e/resource-retain-duration/chainsaw-test.yaml +++ b/tests/e2e/resource-retain-duration/chainsaw-test.yaml @@ -41,6 +41,14 @@ spec: value: default timeout: 120s file: "../assertions/spark-application/spark-state-transition-with-retain-check.yaml" + - wait: + apiVersion: spark.apache.org/v1 + kind: SparkApplication + namespace: default + name: ($SPARK_APPLICATION_NAME) + for: + deletion: {} + timeout: 60s catch: - describe: apiVersion: spark.apache.org/v1 @@ -53,4 +61,4 @@ spec: value: ($SPARK_APPLICATION_NAME) timeout: 120s content: | - kubectl delete sparkapplication $SPARK_APPLICATION_NAME + kubectl delete sparkapplication $SPARK_APPLICATION_NAME --ignore-not-found=true diff --git a/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml index 022fdd4..952bfff 100644 --- a/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml +++ b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml @@ -26,6 +26,7 @@ spec: applicationTolerations: resourceRetainPolicy: Always resourceRetainDurationMillis: 10000 + ttlAfterStopMillis: 30000 sparkConf: spark.executor.instances: "1" spark.kubernetes.container.image: "apache/spark:4.0.0-java21-scala" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org