This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 02fa1abf2d35222fd7988c4aa4d461c221101462
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Mar 3 13:13:23 2026 +0100

    [FLINK-39271] Harden session job deletion flow
---
 .../sessionjob/SessionJobReconciler.java           |  88 +++++++----
 .../flink/kubernetes/operator/TestUtils.java       |  39 +++++
 .../sessionjob/SessionJobReconcilerTest.java       | 172 +++++++++++++++++++++
 3 files changed, 269 insertions(+), 30 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index f5d10626..974c366b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -23,6 +23,7 @@ import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
@@ -35,6 +36,7 @@ import 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
 import org.apache.flink.kubernetes.operator.service.SuspendMode;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -128,7 +130,12 @@ public class SessionJobReconciler
     @Override
     public DeleteControl cleanupInternal(FlinkResourceContext<FlinkSessionJob> 
ctx) {
         var status = ctx.getResource().getStatus();
-        long delay = 
ctx.getOperatorConfig().getProgressCheckInterval().toMillis();
+        var rescheduleDelete =
+                DeleteControl.noFinalizerRemoval()
+                        .rescheduleAfter(
+                                
ctx.getOperatorConfig().getProgressCheckInterval().toMillis());
+        var jobID = ctx.getResource().getStatus().getJobStatus().getJobId();
+
         if (status.getReconciliationStatus().isBeforeFirstDeployment()
                 || ReconciliationUtils.isJobInTerminalState(status)
                 || status.getReconciliationStatus()
@@ -136,45 +143,66 @@ public class SessionJobReconciler
                                 .getJob()
                                 .getState()
                         == JobState.SUSPENDED
-                || 
JobStatusObserver.JOB_NOT_FOUND_ERR.equals(status.getError())) {
+                || 
JobStatusObserver.JOB_NOT_FOUND_ERR.equals(status.getError())
+                || jobID == null) {
             // Job is not running, nothing to do...
             return DeleteControl.defaultDelete();
         }
 
+        var flinkDepOptional = 
ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class);
+        if (flinkDepOptional.isEmpty()) {
+            LOG.info("Session cluster deployment not available");
+            return DeleteControl.defaultDelete();
+        }
+
+        var flinkDep = flinkDepOptional.get();
+
+        // If the session cluster is being deleted, the job will not survive 
regardless,
+        // so there is no need to explicitly cancel it.
+        var sessionLifecycleState = flinkDep.getStatus().getLifecycleState();
+        if (sessionLifecycleState == ResourceLifecycleState.DELETING
+                || sessionLifecycleState == ResourceLifecycleState.DELETED) {
+            LOG.info("Session cluster is being deleted, skipping job 
cancellation");
+            return DeleteControl.defaultDelete();
+        }
+
+        if (!sessionClusterReady(flinkDepOptional)) {
+            // If the session cluster is not healthy and HA is not enabled, 
the job state
+            // will not survive a cluster restart, so we can safely delete 
without
+            // explicit cancellation.
+            var sessionConf = 
flinkDep.getSpec().getFlinkConfiguration().asConfiguration();
+            if (!FlinkUtils.isKubernetesHAActivated(sessionConf)
+                    && !FlinkUtils.isZookeeperHAActivated(sessionConf)) {
+                LOG.info(
+                        "Session cluster is not healthy and HA is not enabled, 
skipping job cancellation");
+                return DeleteControl.defaultDelete();
+            }
+            LOG.info("Session cluster is not healthy, waiting before 
attempting job cancellation");
+            return rescheduleDelete;
+        }
+
+        // Only check for pending cancellation once the cluster is ready, so 
that other deletion
+        // paths (cluster missing, deleting, unhealthy without HA) are not 
blocked.
         if (ReconciliationUtils.isJobCancelling(status)) {
             LOG.info("Waiting for pending cancellation");
-            return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
+            return rescheduleDelete;
         }
 
-        Optional<FlinkDeployment> flinkDepOptional =
-                
ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class);
-
-        if (flinkDepOptional.isPresent()) {
-            String jobID = 
ctx.getResource().getStatus().getJobStatus().getJobId();
-            if (jobID != null) {
-                try {
-                    var observeConfig = ctx.getObserveConfig();
-                    var suspendMode =
-                            observeConfig.getBoolean(
-                                            
KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION)
-                                    ? SuspendMode.SAVEPOINT
-                                    : SuspendMode.STATELESS;
-                    if (cancelJob(ctx, suspendMode)) {
-                        LOG.info("Waiting for pending cancellation");
-                        return 
DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
-                    }
-                } catch (Exception e) {
-                    LOG.error(
-                            "Failed to cancel job, will reschedule after {} 
milliseconds.",
-                            delay,
-                            e);
-                    return 
DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
-                }
+        try {
+            var observeConfig = ctx.getObserveConfig();
+            var suspendMode =
+                    
observeConfig.getBoolean(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION)
+                            ? SuspendMode.SAVEPOINT
+                            : SuspendMode.STATELESS;
+            if (cancelJob(ctx, suspendMode)) {
+                LOG.info("Waiting for pending cancellation");
+                return rescheduleDelete;
             }
-        } else {
-            LOG.info("Session cluster deployment not available");
+            return DeleteControl.defaultDelete();
+        } catch (Exception e) {
+            LOG.error("Failed to cancel job, rescheduling deletion.", e);
+            return rescheduleDelete;
         }
-        return DeleteControl.defaultDelete();
     }
 
     public static boolean sessionClusterReady(Optional<FlinkDeployment> 
flinkDeploymentOpt) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index a48f41d9..3f6f2fff 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobReference;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
@@ -248,6 +249,44 @@ public class TestUtils extends BaseTestUtils {
         };
     }
 
+    public static <T extends HasMetadata>
+            Context<T> createContextWithFlinkDeploymentInLifecycleState(
+                    ResourceLifecycleState lifecycleState) {
+        return new TestingContext<>() {
+            @Override
+            public Optional<T> getSecondaryResource(Class expectedType, String 
eventSourceName) {
+                var session = buildSessionCluster();
+                session.getStatus().setLifecycleState(lifecycleState);
+                session.getStatus()
+                        
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+                return (Optional<T>) Optional.of(session);
+            }
+        };
+    }
+
+    public static <T extends HasMetadata> Context<T> 
createContextWithUnhealthyFlinkDeployment(
+            boolean haEnabled) {
+        return new TestingContext<>() {
+            @Override
+            public Optional<T> getSecondaryResource(Class expectedType, String 
eventSourceName) {
+                var session = buildSessionCluster();
+                session.getStatus()
+                        
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+                if (!haEnabled) {
+                    session.getSpec()
+                            .getFlinkConfiguration()
+                            .remove(
+                                    
org.apache.flink.configuration.HighAvailabilityOptions.HA_MODE
+                                            .key(),
+                                    
org.apache.flink.configuration.HighAvailabilityOptions
+                                            .HA_STORAGE_PATH
+                                            .key());
+                }
+                return (Optional<T>) Optional.of(session);
+            }
+        };
+    }
+
     public static final String DEPLOYMENT_ERROR = "test deployment error 
message";
 
     public static <T extends HasMetadata> Context<T> 
createContextWithFailedJobManagerDeployment(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index b001ed77..186de7eb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
@@ -861,4 +862,175 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         // New jobID recorded despite failure
         Assertions.assertNotEquals(jobID, 
sessionJob.getStatus().getJobStatus().getJobId());
     }
+
+    @Test
+    public void testCleanupWithDeletingSessionCluster() throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        // Submit and set running
+        reconciler.reconcile(
+                sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
+
+        // Cleanup with session cluster in DELETING state - should proceed 
without cancellation
+        var deleteControl =
+                reconciler.cleanup(
+                        sessionJob,
+                        
TestUtils.createContextWithFlinkDeploymentInLifecycleState(
+                                ResourceLifecycleState.DELETING));
+        assertTrue(deleteControl.isRemoveFinalizer());
+        // No cancellation attempted, job still in RUNNING state
+        assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState());
+    }
+
+    @Test
+    public void testCleanupWithDeletedSessionCluster() throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        // Submit and set running
+        reconciler.reconcile(
+                sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
+
+        // Cleanup with session cluster in DELETED state - should proceed 
without cancellation
+        var deleteControl =
+                reconciler.cleanup(
+                        sessionJob,
+                        
TestUtils.createContextWithFlinkDeploymentInLifecycleState(
+                                ResourceLifecycleState.DELETED));
+        assertTrue(deleteControl.isRemoveFinalizer());
+        // No cancellation attempted, job still in RUNNING state
+        assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState());
+    }
+
+    @Test
+    public void testCleanupWithUnhealthySessionClusterNoHa() throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        // Submit and set running
+        reconciler.reconcile(
+                sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
+
+        // Cleanup with unhealthy session cluster and no HA - should proceed 
without cancellation
+        var deleteControl =
+                reconciler.cleanup(
+                        sessionJob, 
TestUtils.createContextWithUnhealthyFlinkDeployment(false));
+        assertTrue(deleteControl.isRemoveFinalizer());
+        // No cancellation attempted, job still in RUNNING state
+        assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState());
+    }
+
+    @Test
+    public void testCleanupWithUnhealthySessionClusterWithHa() throws 
Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        // Submit and set running
+        reconciler.reconcile(
+                sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
+
+        // Cleanup with unhealthy session cluster and HA enabled - should 
reschedule and wait
+        var deleteControl =
+                reconciler.cleanup(
+                        sessionJob, 
TestUtils.createContextWithUnhealthyFlinkDeployment(true));
+        assertFalse(deleteControl.isRemoveFinalizer());
+        assertEquals(10_000, deleteControl.getScheduleDelay().get());
+        // No cancellation attempted, job still in RUNNING state
+        assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState());
+    }
+
+    @Test
+    public void testCleanupWithPendingCancellationAndClusterNotFound() throws 
Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        // Submit and set running
+        reconciler.reconcile(
+                sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
+
+        // Simulate a pending cancellation (e.g. cancel was initiated in a 
prior reconciliation)
+        sessionJob.getStatus().getJobStatus().setState(CANCELLING);
+
+        // Cluster is no longer available - should proceed with deletion 
rather than being blocked
+        var deleteControl = reconciler.cleanup(sessionJob, 
TestUtils.createEmptyContext());
+        assertTrue(deleteControl.isRemoveFinalizer());
+    }
+
+    @Test
+    public void testCleanupWithPendingCancellationAndDeletingCluster() throws 
Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        // Submit and set running
+        reconciler.reconcile(
+                sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
+
+        // Simulate a pending cancellation
+        sessionJob.getStatus().getJobStatus().setState(CANCELLING);
+
+        // Cluster is being deleted - should proceed with deletion rather than 
being blocked
+        var deleteControl =
+                reconciler.cleanup(
+                        sessionJob,
+                        
TestUtils.createContextWithFlinkDeploymentInLifecycleState(
+                                ResourceLifecycleState.DELETING));
+        assertTrue(deleteControl.isRemoveFinalizer());
+    }
+
+    @Test
+    public void testCleanupWithPendingCancellationAndUnhealthyClusterNoHa() 
throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        // Submit and set running
+        reconciler.reconcile(
+                sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
+
+        // Simulate a pending cancellation
+        sessionJob.getStatus().getJobStatus().setState(CANCELLING);
+
+        // Cluster is unhealthy with no HA - should proceed with deletion 
rather than being blocked
+        var deleteControl =
+                reconciler.cleanup(
+                        sessionJob, 
TestUtils.createContextWithUnhealthyFlinkDeployment(false));
+        assertTrue(deleteControl.isRemoveFinalizer());
+    }
+
+    @Test
+    public void testCleanupWithPendingCancellationAndReadyCluster() throws 
Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        // Submit and set running
+        reconciler.reconcile(
+                sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
+
+        // Simulate a pending cancellation
+        sessionJob.getStatus().getJobStatus().setState(CANCELLING);
+
+        // Cluster is ready - should still wait for the pending cancellation 
to complete
+        var deleteControl =
+                reconciler.cleanup(
+                        sessionJob,
+                        
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertFalse(deleteControl.isRemoveFinalizer());
+        assertEquals(10_000, deleteControl.getScheduleDelay().get());
+    }
 }

Reply via email to