This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 896590b2 [FLINK-39432] Add option to delete with session running
session jobs
896590b2 is described below
commit 896590b2a073243cf156bdf2abd03555388a86ca
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Apr 14 11:13:26 2026 +0200
[FLINK-39432] Add option to delete with session running session jobs
---
.../shortcodes/generated/dynamic_section.html | 8 ++-
.../kubernetes_operator_config_configuration.html | 8 ++-
.../config/KubernetesOperatorConfigOptions.java | 11 ++-
.../reconciler/deployment/SessionReconciler.java | 12 ++--
.../deployment/SessionReconcilerTest.java | 78 ++++++++++++++++++++++
5 files changed, 110 insertions(+), 7 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html
b/docs/layouts/shortcodes/generated/dynamic_section.html
index f81afa4f..11eafe81 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -195,7 +195,13 @@
<td>The interval before a savepoint trigger attempt is marked as
unsuccessful.</td>
</tr>
<tr>
-
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
+
<td><h5>kubernetes.operator.session.deletion.block-on-session-jobs</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Block FlinkDeployment deletion if managed jobs are running in
the session cluster.</td>
+ </tr>
+ <tr>
+
<td><h5>kubernetes.operator.session.deletion.block-on-unmanaged-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not
managed by FlinkSessionJob resources) are running in the session cluster.
Example: Jobs submitted via CLI.</td>
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index e92f04f5..7cac60f1 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -429,7 +429,13 @@
<td>The interval before a savepoint trigger attempt is marked as
unsuccessful.</td>
</tr>
<tr>
-
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
+
<td><h5>kubernetes.operator.session.deletion.block-on-session-jobs</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Block FlinkDeployment deletion if managed jobs are running in
the session cluster.</td>
+ </tr>
+ <tr>
+
<td><h5>kubernetes.operator.session.deletion.block-on-unmanaged-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not
managed by FlinkSessionJob resources) are running in the session cluster.
Example: Jobs submitted via CLI.</td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index f0dc9430..8e418c25 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -657,12 +657,21 @@ public class KubernetesOperatorConfigOptions {
@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> BLOCK_ON_UNMANAGED_JOBS =
- operatorConfig("session.block-on-unmanaged-jobs")
+ operatorConfig("session.deletion.block-on-unmanaged-jobs")
.booleanType()
.defaultValue(true)
+
.withDeprecatedKeys(operatorConfigKey("session.block-on-unmanaged-jobs"))
.withDescription(
"Block FlinkDeployment deletion if unmanaged jobs
(jobs not managed by FlinkSessionJob resources) are running in the session
cluster. Example: Jobs submitted via CLI.");
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<Boolean> BLOCK_ON_SESSION_JOBS =
+ operatorConfig("session.deletion.block-on-session-jobs")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Block FlinkDeployment deletion if managed jobs
are running in the session cluster.");
+
@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW =
operatorConfig("cluster.resource-view.refresh-interval")
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 4c998fec..d7ad7492 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -160,10 +160,14 @@ public class SessionReconciler
@Override
public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment>
ctx) {
- Set<FlinkSessionJob> sessionJobs =
-
ctx.getJosdkContext().getSecondaryResources(FlinkSessionJob.class);
+ var sessionJobs =
ctx.getJosdkContext().getSecondaryResources(FlinkSessionJob.class);
var deployment = ctx.getResource();
- if (!sessionJobs.isEmpty()) {
+
+ boolean blockOnSessionJobs =
+ ctx.getObserveConfig()
+
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS);
+
+ if (blockOnSessionJobs && !sessionJobs.isEmpty()) {
var error =
String.format(
"The session jobs %s should be deleted first",
@@ -188,7 +192,7 @@ public class SessionReconciler
boolean blockOnUnmanagedJobs =
ctx.getObserveConfig()
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
- if (blockOnUnmanagedJobs) {
+ if (blockOnSessionJobs && blockOnUnmanagedJobs) {
Set<JobID> nonTerminalJobs = getNonTerminalJobs(ctx);
if (!nonTerminalJobs.isEmpty()) {
var error =
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 930ad9a4..ba159918 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import lombok.Getter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -305,4 +306,81 @@ public class SessionReconcilerTest extends
OperatorTestBase {
nonTerminalJobsAfterRemoval.size(),
"Should have no non-terminal jobs when only terminated jobs
exist");
}
+
+ @Test
+ public void testDeleteSessionWithBlockOnSessionJobsFalse() throws
Exception {
+ FlinkDeployment deployment = TestUtils.buildSessionCluster();
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+
.put(KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS.key(), "false");
+
+ reconciler.reconcile(deployment, flinkService.getContext());
+
+ assertEquals(
+ ReconciliationState.DEPLOYED,
+ deployment.getStatus().getReconciliationStatus().getState());
+
+ // Create some running jobs
+ JobID managedJobId1 = new JobID();
+ JobID managedJobId2 = new JobID();
+ JobID unmanagedRunningJobId = new JobID();
+
+ flinkService
+ .listJobs()
+ .add(
+ Tuple3.of(
+ null,
+ new JobStatusMessage(
+ managedJobId1,
+ "managed-job-1",
+ JobStatus.RUNNING,
+ System.currentTimeMillis()),
+ new Configuration()));
+ flinkService
+ .listJobs()
+ .add(
+ Tuple3.of(
+ null,
+ new JobStatusMessage(
+ managedJobId2,
+ "managed-job-2",
+ JobStatus.RUNNING,
+ System.currentTimeMillis()),
+ new Configuration()));
+ flinkService
+ .listJobs()
+ .add(
+ Tuple3.of(
+ null,
+ new JobStatusMessage(
+ unmanagedRunningJobId,
+ "unmanaged-running-job",
+ JobStatus.RUNNING,
+ System.currentTimeMillis()),
+ new Configuration()));
+
+ // Create FlinkSessionJob resources for the managed jobs
+ FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob();
+ managedSessionJob1.getMetadata().setName("managed-session-job-1");
+
managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString());
+ kubernetesClient.resource(managedSessionJob1).createOrReplace();
+
+ FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob();
+ managedSessionJob2.getMetadata().setName("managed-session-job-2");
+
managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString());
+ kubernetesClient.resource(managedSessionJob2).createOrReplace();
+
+ // Test cleanup with BLOCK_ON_SESSION_JOBS=false
+ var context =
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
+ var resourceContext = getResourceContext(deployment, context);
+
+ var sessionReconciler = (SessionReconciler) reconciler.getReconciler();
+ DeleteControl deleteControl =
sessionReconciler.cleanupInternal(resourceContext);
+
+ // Verify that deletion proceeds immediately despite running jobs
+ assertTrue(
+ deleteControl.isRemoveFinalizer(),
+ "Session should be deleted immediately when
BLOCK_ON_SESSION_JOBS is false");
+ }
}