This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 02fa1abf2d35222fd7988c4aa4d461c221101462 Author: Gyula Fora <[email protected]> AuthorDate: Tue Mar 3 13:13:23 2026 +0100 [FLINK-39271] Harden session job deletion flow --- .../sessionjob/SessionJobReconciler.java | 88 +++++++---- .../flink/kubernetes/operator/TestUtils.java | 39 +++++ .../sessionjob/SessionJobReconcilerTest.java | 172 +++++++++++++++++++++ 3 files changed, 269 insertions(+), 30 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java index f5d10626..974c366b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java @@ -23,6 +23,7 @@ import org.apache.flink.autoscaler.JobAutoScaler; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec; import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus; @@ -35,6 +36,7 @@ import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler; import org.apache.flink.kubernetes.operator.service.SuspendMode; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; @@ -128,7 +130,12 @@ public class SessionJobReconciler @Override public DeleteControl cleanupInternal(FlinkResourceContext<FlinkSessionJob> ctx) { var status = ctx.getResource().getStatus(); - long delay = ctx.getOperatorConfig().getProgressCheckInterval().toMillis(); + var rescheduleDelete = + DeleteControl.noFinalizerRemoval() + .rescheduleAfter( + ctx.getOperatorConfig().getProgressCheckInterval().toMillis()); + var jobID = ctx.getResource().getStatus().getJobStatus().getJobId(); + if (status.getReconciliationStatus().isBeforeFirstDeployment() || ReconciliationUtils.isJobInTerminalState(status) || status.getReconciliationStatus() @@ -136,45 +143,66 @@ public class SessionJobReconciler .getJob() .getState() == JobState.SUSPENDED - || JobStatusObserver.JOB_NOT_FOUND_ERR.equals(status.getError())) { + || JobStatusObserver.JOB_NOT_FOUND_ERR.equals(status.getError()) + || jobID == null) { // Job is not running, nothing to do... return DeleteControl.defaultDelete(); } + var flinkDepOptional = ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class); + if (flinkDepOptional.isEmpty()) { + LOG.info("Session cluster deployment not available"); + return DeleteControl.defaultDelete(); + } + + var flinkDep = flinkDepOptional.get(); + + // If the session cluster is being deleted, the job will not survive regardless, + // so there is no need to explicitly cancel it. + var sessionLifecycleState = flinkDep.getStatus().getLifecycleState(); + if (sessionLifecycleState == ResourceLifecycleState.DELETING + || sessionLifecycleState == ResourceLifecycleState.DELETED) { + LOG.info("Session cluster is being deleted, skipping job cancellation"); + return DeleteControl.defaultDelete(); + } + + if (!sessionClusterReady(flinkDepOptional)) { + // If the session cluster is not healthy and HA is not enabled, the job state + // will not survive a cluster restart, so we can safely delete without + // explicit cancellation. + var sessionConf = flinkDep.getSpec().getFlinkConfiguration().asConfiguration(); + if (!FlinkUtils.isKubernetesHAActivated(sessionConf) + && !FlinkUtils.isZookeeperHAActivated(sessionConf)) { + LOG.info( + "Session cluster is not healthy and HA is not enabled, skipping job cancellation"); + return DeleteControl.defaultDelete(); + } + LOG.info("Session cluster is not healthy, waiting before attempting job cancellation"); + return rescheduleDelete; + } + + // Only check for pending cancellation once the cluster is ready, so that other deletion + // paths (cluster missing, deleting, unhealthy without HA) are not blocked. if (ReconciliationUtils.isJobCancelling(status)) { LOG.info("Waiting for pending cancellation"); - return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay); + return rescheduleDelete; } - Optional<FlinkDeployment> flinkDepOptional = - ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class); - - if (flinkDepOptional.isPresent()) { - String jobID = ctx.getResource().getStatus().getJobStatus().getJobId(); - if (jobID != null) { - try { - var observeConfig = ctx.getObserveConfig(); - var suspendMode = - observeConfig.getBoolean( - KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION) - ? SuspendMode.SAVEPOINT - : SuspendMode.STATELESS; - if (cancelJob(ctx, suspendMode)) { - LOG.info("Waiting for pending cancellation"); - return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay); - } - } catch (Exception e) { - LOG.error( - "Failed to cancel job, will reschedule after {} milliseconds.", - delay, - e); - return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay); - } + try { + var observeConfig = ctx.getObserveConfig(); + var suspendMode = + observeConfig.getBoolean(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION) + ? SuspendMode.SAVEPOINT + : SuspendMode.STATELESS; + if (cancelJob(ctx, suspendMode)) { + LOG.info("Waiting for pending cancellation"); + return rescheduleDelete; } - } else { - LOG.info("Session cluster deployment not available"); + return DeleteControl.defaultDelete(); + } catch (Exception e) { + LOG.error("Failed to cancel job, rescheduling deletion.", e); + return rescheduleDelete; } - return DeleteControl.defaultDelete(); } public static boolean sessionClusterReady(Optional<FlinkDeployment> flinkDeploymentOpt) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index a48f41d9..3f6f2fff 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobReference; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; @@ -248,6 +249,44 @@ public class TestUtils extends BaseTestUtils { }; } + public static <T extends HasMetadata> + Context<T> createContextWithFlinkDeploymentInLifecycleState( + ResourceLifecycleState lifecycleState) { + return new TestingContext<>() { + @Override + public Optional<T> getSecondaryResource(Class expectedType, String eventSourceName) { + var session = buildSessionCluster(); + session.getStatus().setLifecycleState(lifecycleState); + session.getStatus() + .setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); + return (Optional<T>) Optional.of(session); + } + }; + } + + public static <T extends HasMetadata> Context<T> createContextWithUnhealthyFlinkDeployment( + boolean haEnabled) { + return new TestingContext<>() { + @Override + public Optional<T> getSecondaryResource(Class expectedType, String eventSourceName) { + var session = buildSessionCluster(); + session.getStatus() + .setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); + if (!haEnabled) { + session.getSpec() + .getFlinkConfiguration() + .remove( + org.apache.flink.configuration.HighAvailabilityOptions.HA_MODE + .key(), + org.apache.flink.configuration.HighAvailabilityOptions + .HA_STORAGE_PATH + .key()); + } + return (Optional<T>) Optional.of(session); + } + }; + } + public static final String DEPLOYMENT_ERROR = "test deployment error message"; public static <T extends HasMetadata> Context<T> createContextWithFailedJobManagerDeployment( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java index b001ed77..186de7eb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.CrdConstants; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec; import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; @@ -861,4 +862,175 @@ public class SessionJobReconcilerTest extends OperatorTestBase { // New jobID recorded despite failure Assertions.assertNotEquals(jobID, sessionJob.getStatus().getJobStatus().getJobId()); } + + @Test + public void testCleanupWithDeletingSessionCluster() throws Exception { + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + + // Submit and set running + reconciler.reconcile( + sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + assertEquals(1, flinkService.listJobs().size()); + verifyAndSetRunningJobsToStatus( + sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs()); + + // Cleanup with session cluster in DELETING state - should proceed without cancellation + var deleteControl = + reconciler.cleanup( + sessionJob, + TestUtils.createContextWithFlinkDeploymentInLifecycleState( + ResourceLifecycleState.DELETING)); + assertTrue(deleteControl.isRemoveFinalizer()); + // No cancellation attempted, job still in RUNNING state + assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState()); + } + + @Test + public void testCleanupWithDeletedSessionCluster() throws Exception { + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + + // Submit and set running + reconciler.reconcile( + sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + assertEquals(1, flinkService.listJobs().size()); + verifyAndSetRunningJobsToStatus( + sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs()); + + // Cleanup with session cluster in DELETED state - should proceed without cancellation + var deleteControl = + reconciler.cleanup( + sessionJob, + TestUtils.createContextWithFlinkDeploymentInLifecycleState( + ResourceLifecycleState.DELETED)); + assertTrue(deleteControl.isRemoveFinalizer()); + // No cancellation attempted, job still in RUNNING state + assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState()); + } + + @Test + public void testCleanupWithUnhealthySessionClusterNoHa() throws Exception { + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + + // Submit and set running + reconciler.reconcile( + sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + assertEquals(1, flinkService.listJobs().size()); + verifyAndSetRunningJobsToStatus( + sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs()); + + // Cleanup with unhealthy session cluster and no HA - should proceed without cancellation + var deleteControl = + reconciler.cleanup( + sessionJob, TestUtils.createContextWithUnhealthyFlinkDeployment(false)); + assertTrue(deleteControl.isRemoveFinalizer()); + // No cancellation attempted, job still in RUNNING state + assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState()); + } + + @Test + public void testCleanupWithUnhealthySessionClusterWithHa() throws Exception { + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + + // Submit and set running + reconciler.reconcile( + sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + assertEquals(1, flinkService.listJobs().size()); + verifyAndSetRunningJobsToStatus( + sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs()); + + // Cleanup with unhealthy session cluster and HA enabled - should reschedule and wait + var deleteControl = + reconciler.cleanup( + sessionJob, TestUtils.createContextWithUnhealthyFlinkDeployment(true)); + assertFalse(deleteControl.isRemoveFinalizer()); + assertEquals(10_000, deleteControl.getScheduleDelay().get()); + // No cancellation attempted, job still in RUNNING state + assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState()); + } + + @Test + public void testCleanupWithPendingCancellationAndClusterNotFound() throws Exception { + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + + // Submit and set running + reconciler.reconcile( + sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + assertEquals(1, flinkService.listJobs().size()); + verifyAndSetRunningJobsToStatus( + sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs()); + + // Simulate a pending cancellation (e.g. cancel was initiated in a prior reconciliation) + sessionJob.getStatus().getJobStatus().setState(CANCELLING); + + // Cluster is no longer available - should proceed with deletion rather than being blocked + var deleteControl = reconciler.cleanup(sessionJob, TestUtils.createEmptyContext()); + assertTrue(deleteControl.isRemoveFinalizer()); + } + + @Test + public void testCleanupWithPendingCancellationAndDeletingCluster() throws Exception { + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + + // Submit and set running + reconciler.reconcile( + sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + assertEquals(1, flinkService.listJobs().size()); + verifyAndSetRunningJobsToStatus( + sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs()); + + // Simulate a pending cancellation + sessionJob.getStatus().getJobStatus().setState(CANCELLING); + + // Cluster is being deleted - should proceed with deletion rather than being blocked + var deleteControl = + reconciler.cleanup( + sessionJob, + TestUtils.createContextWithFlinkDeploymentInLifecycleState( + ResourceLifecycleState.DELETING)); + assertTrue(deleteControl.isRemoveFinalizer()); + } + + @Test + public void testCleanupWithPendingCancellationAndUnhealthyClusterNoHa() throws Exception { + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + + // Submit and set running + reconciler.reconcile( + sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + assertEquals(1, flinkService.listJobs().size()); + verifyAndSetRunningJobsToStatus( + sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs()); + + // Simulate a pending cancellation + sessionJob.getStatus().getJobStatus().setState(CANCELLING); + + // Cluster is unhealthy with no HA - should proceed with deletion rather than being blocked + var deleteControl = + reconciler.cleanup( + sessionJob, TestUtils.createContextWithUnhealthyFlinkDeployment(false)); + assertTrue(deleteControl.isRemoveFinalizer()); + } + + @Test + public void testCleanupWithPendingCancellationAndReadyCluster() throws Exception { + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + + // Submit and set running + reconciler.reconcile( + sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + assertEquals(1, flinkService.listJobs().size()); + verifyAndSetRunningJobsToStatus( + sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs()); + + // Simulate a pending cancellation + sessionJob.getStatus().getJobStatus().setState(CANCELLING); + + // Cluster is ready - should still wait for the pending cancellation to complete + var deleteControl = + reconciler.cleanup( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + assertFalse(deleteControl.isRemoveFinalizer()); + assertEquals(10_000, deleteControl.getScheduleDelay().get()); + } }
