This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 896590b2 [FLINK-39432] Add option to delete with session running 
session jobs
896590b2 is described below

commit 896590b2a073243cf156bdf2abd03555388a86ca
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Apr 14 11:13:26 2026 +0200

    [FLINK-39432] Add option to delete with session running session jobs
---
 .../shortcodes/generated/dynamic_section.html      |  8 ++-
 .../kubernetes_operator_config_configuration.html  |  8 ++-
 .../config/KubernetesOperatorConfigOptions.java    | 11 ++-
 .../reconciler/deployment/SessionReconciler.java   | 12 ++--
 .../deployment/SessionReconcilerTest.java          | 78 ++++++++++++++++++++++
 5 files changed, 110 insertions(+), 7 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index f81afa4f..11eafe81 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -195,7 +195,13 @@
             <td>The interval before a savepoint trigger attempt is marked as 
unsuccessful.</td>
         </tr>
         <tr>
-            
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
+            
<td><h5>kubernetes.operator.session.deletion.block-on-session-jobs</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Block FlinkDeployment deletion if managed jobs are running in 
the session cluster.</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.session.deletion.block-on-unmanaged-jobs</h5></td>
             <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
             <td>Block FlinkDeployment deletion if unmanaged jobs (jobs not 
managed by FlinkSessionJob resources) are running in the session cluster. 
Example: Jobs submitted via CLI.</td>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index e92f04f5..7cac60f1 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -429,7 +429,13 @@
             <td>The interval before a savepoint trigger attempt is marked as 
unsuccessful.</td>
         </tr>
         <tr>
-            
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
+            
<td><h5>kubernetes.operator.session.deletion.block-on-session-jobs</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Block FlinkDeployment deletion if managed jobs are running in 
the session cluster.</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.session.deletion.block-on-unmanaged-jobs</h5></td>
             <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
             <td>Block FlinkDeployment deletion if unmanaged jobs (jobs not 
managed by FlinkSessionJob resources) are running in the session cluster. 
Example: Jobs submitted via CLI.</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index f0dc9430..8e418c25 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -657,12 +657,21 @@ public class KubernetesOperatorConfigOptions {
 
     @Documentation.Section(SECTION_DYNAMIC)
     public static final ConfigOption<Boolean> BLOCK_ON_UNMANAGED_JOBS =
-            operatorConfig("session.block-on-unmanaged-jobs")
+            operatorConfig("session.deletion.block-on-unmanaged-jobs")
                     .booleanType()
                     .defaultValue(true)
+                    
.withDeprecatedKeys(operatorConfigKey("session.block-on-unmanaged-jobs"))
                     .withDescription(
                             "Block FlinkDeployment deletion if unmanaged jobs 
(jobs not managed by FlinkSessionJob resources) are running in the session 
cluster. Example: Jobs submitted via CLI.");
 
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> BLOCK_ON_SESSION_JOBS =
+            operatorConfig("session.deletion.block-on-session-jobs")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Block FlinkDeployment deletion if managed jobs 
are running in the session cluster.");
+
     @Documentation.Section(SECTION_ADVANCED)
     public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW =
             operatorConfig("cluster.resource-view.refresh-interval")
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 4c998fec..d7ad7492 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -160,10 +160,14 @@ public class SessionReconciler
 
     @Override
     public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> 
ctx) {
-        Set<FlinkSessionJob> sessionJobs =
-                
ctx.getJosdkContext().getSecondaryResources(FlinkSessionJob.class);
+        var sessionJobs = 
ctx.getJosdkContext().getSecondaryResources(FlinkSessionJob.class);
         var deployment = ctx.getResource();
-        if (!sessionJobs.isEmpty()) {
+
+        boolean blockOnSessionJobs =
+                ctx.getObserveConfig()
+                        
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS);
+
+        if (blockOnSessionJobs && !sessionJobs.isEmpty()) {
             var error =
                     String.format(
                             "The session jobs %s should be deleted first",
@@ -188,7 +192,7 @@ public class SessionReconciler
         boolean blockOnUnmanagedJobs =
                 ctx.getObserveConfig()
                         
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
-        if (blockOnUnmanagedJobs) {
+        if (blockOnSessionJobs && blockOnUnmanagedJobs) {
             Set<JobID> nonTerminalJobs = getNonTerminalJobs(ctx);
             if (!nonTerminalJobs.isEmpty()) {
                 var error =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 930ad9a4..ba159918 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
 import lombok.Getter;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -305,4 +306,81 @@ public class SessionReconcilerTest extends 
OperatorTestBase {
                 nonTerminalJobsAfterRemoval.size(),
                 "Should have no non-terminal jobs when only terminated jobs 
exist");
     }
+
+    @Test
+    public void testDeleteSessionWithBlockOnSessionJobsFalse() throws 
Exception {
+        FlinkDeployment deployment = TestUtils.buildSessionCluster();
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                
.put(KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS.key(), "false");
+
+        reconciler.reconcile(deployment, flinkService.getContext());
+
+        assertEquals(
+                ReconciliationState.DEPLOYED,
+                deployment.getStatus().getReconciliationStatus().getState());
+
+        // Create some running jobs
+        JobID managedJobId1 = new JobID();
+        JobID managedJobId2 = new JobID();
+        JobID unmanagedRunningJobId = new JobID();
+
+        flinkService
+                .listJobs()
+                .add(
+                        Tuple3.of(
+                                null,
+                                new JobStatusMessage(
+                                        managedJobId1,
+                                        "managed-job-1",
+                                        JobStatus.RUNNING,
+                                        System.currentTimeMillis()),
+                                new Configuration()));
+        flinkService
+                .listJobs()
+                .add(
+                        Tuple3.of(
+                                null,
+                                new JobStatusMessage(
+                                        managedJobId2,
+                                        "managed-job-2",
+                                        JobStatus.RUNNING,
+                                        System.currentTimeMillis()),
+                                new Configuration()));
+        flinkService
+                .listJobs()
+                .add(
+                        Tuple3.of(
+                                null,
+                                new JobStatusMessage(
+                                        unmanagedRunningJobId,
+                                        "unmanaged-running-job",
+                                        JobStatus.RUNNING,
+                                        System.currentTimeMillis()),
+                                new Configuration()));
+
+        // Create FlinkSessionJob resources for the managed jobs
+        FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob();
+        managedSessionJob1.getMetadata().setName("managed-session-job-1");
+        
managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString());
+        kubernetesClient.resource(managedSessionJob1).createOrReplace();
+
+        FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob();
+        managedSessionJob2.getMetadata().setName("managed-session-job-2");
+        
managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString());
+        kubernetesClient.resource(managedSessionJob2).createOrReplace();
+
+        // Test cleanup with BLOCK_ON_SESSION_JOBS=false
+        var context = 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
+        var resourceContext = getResourceContext(deployment, context);
+
+        var sessionReconciler = (SessionReconciler) reconciler.getReconciler();
+        DeleteControl deleteControl = 
sessionReconciler.cleanupInternal(resourceContext);
+
+        // Verify that deletion proceeds immediately despite running jobs
+        assertTrue(
+                deleteControl.isRemoveFinalizer(),
+                "Session should be deleted immediately when 
BLOCK_ON_SESSION_JOBS is false");
+    }
 }

Reply via email to