This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit cdc568c63ac67511ad405398150e29f360675557
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Feb 16 16:39:41 2026 +0100

    [FLINK-39271] Fix session job cleanup with unaccessible session cluster
---
 .../operator/controller/FlinkResourceContext.java  |  6 ++-
 .../flink/kubernetes/operator/TestUtils.java       |  9 ++---
 .../controller/FlinkSessionJobControllerTest.java  | 44 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 7 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
index f62df74f..959d6cb1 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
@@ -67,7 +67,11 @@ public abstract class FlinkResourceContext<CR extends 
AbstractFlinkResource<?, ?
     }
 
     private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
-        Configuration conf = new 
Configuration(getDeployConfig(resource.getSpec()));
+        Configuration conf = new Configuration();
+        var deployConf = getDeployConfig(resource.getSpec());
+        if (deployConf != null) {
+            conf.addAll(deployConf);
+        }
         conf.set(
                 AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
                 getOperatorConfig().getFlinkClientTimeout());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 3f6f2fff..e80f2eba 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -85,6 +85,8 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.flink.configuration.HighAvailabilityOptions.HA_MODE;
+import static 
org.apache.flink.configuration.HighAvailabilityOptions.HA_STORAGE_PATH;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
@@ -275,12 +277,7 @@ public class TestUtils extends BaseTestUtils {
                 if (!haEnabled) {
                     session.getSpec()
                             .getFlinkConfiguration()
-                            .remove(
-                                    
org.apache.flink.configuration.HighAvailabilityOptions.HA_MODE
-                                            .key(),
-                                    
org.apache.flink.configuration.HighAvailabilityOptions
-                                            .HA_STORAGE_PATH
-                                            .key());
+                            .remove(HA_MODE.key(), HA_STORAGE_PATH.key());
                 }
                 return (Optional<T>) Optional.of(session);
             }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index 7dee2c83..49edb5df 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
@@ -715,6 +716,49 @@ class FlinkSessionJobControllerTest {
         assertEquals(ResourceLifecycleState.DELETED, 
sessionJob.getStatus().getLifecycleState());
     }
 
+    @Test
+    public void testCleanupTerminalStateJobWithUnavailableSessionCluster() 
throws Exception {
+        // Submit job with ready session cluster
+        testController.reconcile(sessionJob, context);
+        testController.reconcile(sessionJob, context);
+
+        assertEquals(RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(1, flinkService.listJobs().size());
+
+        // Simulate job reaching terminal state (CANCELLED) in the Flink 
cluster
+        var jobs = flinkService.listJobs();
+        var job = jobs.get(0);
+        var cancelledStatus =
+                new JobStatusMessage(
+                        job.f1.getJobId(),
+                        job.f1.getJobName(),
+                        org.apache.flink.api.common.JobStatus.CANCELED,
+                        job.f1.getStartTime());
+        jobs.set(0, Tuple3.of(job.f0, cancelledStatus, job.f2));
+
+        // Manually update sessionJob status to reflect terminal state
+        sessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.CANCELED);
+
+        // Now simulate session cluster becoming unavailable by using empty 
context
+        Context<FlinkSessionJob> emptyContext =
+                TestUtils.createEmptyContextWithClient(kubernetesClient);
+
+        // Cleanup should still succeed and remove finalizer even without 
session cluster
+        var deleteControl = testController.cleanup(sessionJob, emptyContext);
+        assertTrue(deleteControl.isRemoveFinalizer());
+        assertEquals(
+                ResourceLifecycleState.DELETED,
+                testController
+                        .getStatusUpdateCounter()
+                        .currentResource
+                        .getStatus()
+                        .getLifecycleState());
+        assertEquals(ResourceLifecycleState.DELETED, 
sessionJob.getStatus().getLifecycleState());
+    }
+
     private void verifyReconcileInitialSuspendedDeployment(FlinkSessionJob 
sessionJob)
             throws Exception {
         UpdateControl<FlinkDeployment> updateControl =

Reply via email to