This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit cdc568c63ac67511ad405398150e29f360675557 Author: Gyula Fora <[email protected]> AuthorDate: Mon Feb 16 16:39:41 2026 +0100 [FLINK-39271] Fix session job cleanup with unaccessible session cluster --- .../operator/controller/FlinkResourceContext.java | 6 ++- .../flink/kubernetes/operator/TestUtils.java | 9 ++--- .../controller/FlinkSessionJobControllerTest.java | 44 ++++++++++++++++++++++ 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java index f62df74f..959d6cb1 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java @@ -67,7 +67,11 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ? } private KubernetesJobAutoScalerContext createJobAutoScalerContext() { - Configuration conf = new Configuration(getDeployConfig(resource.getSpec())); + Configuration conf = new Configuration(); + var deployConf = getDeployConfig(resource.getSpec()); + if (deployConf != null) { + conf.addAll(deployConf); + } conf.set( AutoScalerOptions.FLINK_CLIENT_TIMEOUT, getOperatorConfig().getFlinkClientTimeout()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index 3f6f2fff..e80f2eba 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -85,6 +85,8 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.configuration.HighAvailabilityOptions.HA_MODE; +import static org.apache.flink.configuration.HighAvailabilityOptions.HA_STORAGE_PATH; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -275,12 +277,7 @@ public class TestUtils extends BaseTestUtils { if (!haEnabled) { session.getSpec() .getFlinkConfiguration() - .remove( - org.apache.flink.configuration.HighAvailabilityOptions.HA_MODE - .key(), - org.apache.flink.configuration.HighAvailabilityOptions - .HA_STORAGE_PATH - .key()); + .remove(HA_MODE.key(), HA_STORAGE_PATH.key()); } return (Optional<T>) Optional.of(session); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java index 7dee2c83..49edb5df 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.controller; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; @@ -715,6 +716,49 @@ class FlinkSessionJobControllerTest { assertEquals(ResourceLifecycleState.DELETED, sessionJob.getStatus().getLifecycleState()); } + @Test + public void testCleanupTerminalStateJobWithUnavailableSessionCluster() throws Exception { + // Submit job with ready session cluster + testController.reconcile(sessionJob, context); + testController.reconcile(sessionJob, context); + + assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState()); + assertEquals(1, flinkService.listJobs().size()); + + // Simulate job reaching terminal state (CANCELLED) in the Flink cluster + var jobs = flinkService.listJobs(); + var job = jobs.get(0); + var cancelledStatus = + new JobStatusMessage( + job.f1.getJobId(), + job.f1.getJobName(), + org.apache.flink.api.common.JobStatus.CANCELED, + job.f1.getStartTime()); + jobs.set(0, Tuple3.of(job.f0, cancelledStatus, job.f2)); + + // Manually update sessionJob status to reflect terminal state + sessionJob + .getStatus() + .getJobStatus() + .setState(org.apache.flink.api.common.JobStatus.CANCELED); + + // Now simulate session cluster becoming unavailable by using empty context + Context<FlinkSessionJob> emptyContext = + TestUtils.createEmptyContextWithClient(kubernetesClient); + + // Cleanup should still succeed and remove finalizer even without session cluster + var deleteControl = testController.cleanup(sessionJob, emptyContext); + assertTrue(deleteControl.isRemoveFinalizer()); + assertEquals( + ResourceLifecycleState.DELETED, + testController + .getStatusUpdateCounter() + .currentResource + .getStatus() + .getLifecycleState()); + assertEquals(ResourceLifecycleState.DELETED, sessionJob.getStatus().getLifecycleState()); + } + private void verifyReconcileInitialSuspendedDeployment(FlinkSessionJob sessionJob) throws Exception { UpdateControl<FlinkDeployment> updateControl =
