This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 90a7fafa [FLINK-37766] FlinkSessionJob deletion blocked by finalizer
90a7fafa is described below

commit 90a7fafabc8d561bff1f66f8ef69b92ca2e4d71b
Author: Prashant Khanal <[email protected]>
AuthorDate: Tue Apr 28 14:35:43 2026 +0200

    [FLINK-37766] FlinkSessionJob deletion blocked by finalizer
---
 .../operator/service/AbstractFlinkService.java     |  50 ++++++++--
 .../operator/service/AbstractFlinkServiceTest.java | 107 +++++++++++++++++++--
 2 files changed, 138 insertions(+), 19 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index fbbd871b..543b937f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -345,9 +345,11 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                     }
                     break;
                 case CANCEL:
-                    cancelJobOrError(clusterClient, status, false);
-                    // This is async we need to return
-                    return CancelResult.pending();
+                    if (!cancelJobOrError(clusterClient, status, false)) {
+                        // This is async we need to return
+                        return CancelResult.pending();
+                    }
+                    break;
             }
         }
         if (suspendMode.deleteCluster() || deleteCluster) {
@@ -370,9 +372,12 @@ public abstract class AbstractFlinkService implements 
FlinkService {
             switch (suspendMode) {
                 case STATELESS:
                 case CANCEL:
-                    cancelJobOrError(clusterClient, status, suspendMode == 
SuspendMode.STATELESS);
-                    // This is async we need to return and re-observe
-                    return CancelResult.pending();
+                    if (!cancelJobOrError(
+                            clusterClient, status, suspendMode == 
SuspendMode.STATELESS)) {
+                        // This is async we need to return and re-observe
+                        return CancelResult.pending();
+                    }
+                    break;
                 case SAVEPOINT:
                     savepointPath = savepointJobOrError(clusterClient, status, 
conf);
                     break;
@@ -383,14 +388,30 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         return CancelResult.completed(savepointPath);
     }
 
-    public void cancelJobOrError(
+    /**
+     * Attempts to cancel a Flink job given its current status. If the job is 
already in the process
+     * of being cancelled or has been terminated, the method handles these 
cases accordingly.
+     *
+     * @param clusterClient the {@code RestClusterClient} instance used to 
interact with the Flink
+     *     cluster.
+     * @param status the current status of the job, encapsulated in a {@code 
CommonStatus} object.
+     * @param ignoreMissing a flag indicating whether the absence of the job 
should be ignored. If
+     *     {@code true}, the method will return {@code true} when the job is 
missing. If {@code
+     *     false}, an exception will be thrown when the job cannot be found.
+     * @return {@code true} if the job was already missing or terminated, and 
no further action is
+     *     needed. {@code false} if cancellation was successfully initiated 
and is still pending
+     *     (the caller should await completion before proceeding).
+     * @throws UpgradeFailureException if the job cannot be cancelled due to 
an unexpected error, or
+     *     if the job is missing and {@code ignoreMissing} is set to {@code 
false}.
+     */
+    public boolean cancelJobOrError(
             RestClusterClient<String> clusterClient,
             CommonStatus<?> status,
             boolean ignoreMissing) {
         var jobID = JobID.fromHexString(status.getJobStatus().getJobId());
         if (ReconciliationUtils.isJobCancelling(status)) {
             LOG.info("Job already cancelling");
-            return;
+            return false;
         }
         LOG.info("Cancelling job");
         try {
@@ -402,6 +423,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
             if (isJobMissing(e)) {
                 if (ignoreMissing) {
                     LOG.info("Job already missing");
+                    return true;
                 } else {
                     throw new UpgradeFailureException(
                             "Cannot find job when trying to cancel",
@@ -410,6 +432,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                 }
             } else if (isJobTerminated(e)) {
                 LOG.info("Job already terminated");
+                return true;
             } else {
                 LOG.warn("Error while cancelling job", e);
                 throw new UpgradeFailureException(
@@ -417,6 +440,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
             }
         }
         status.getJobStatus().setState(JobStatus.CANCELLING);
+        return false;
     }
 
     public String savepointJobOrError(
@@ -487,9 +511,16 @@ public abstract class AbstractFlinkService implements 
FlinkService {
             return true;
         }
 
-        return findThrowable(e, RestClientException.class)
+        if (findThrowable(e, RestClientException.class)
                 .map(RestClientException::getHttpResponseStatus)
                 .map(respCode -> HttpResponseStatus.CONFLICT == respCode)
+                .orElse(false)) {
+            return true;
+        }
+
+        return Optional.ofNullable(ExceptionUtils.getExceptionMessage(e))
+                .map(String::toLowerCase)
+                .map(msg -> msg.contains("already reached another terminal 
state"))
                 .orElse(false);
     }
 
@@ -1051,6 +1082,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         }
     }
 
+    @Override
     public Map<String, String> getMetrics(
             Configuration conf, String jobId, List<String> metricNames) throws 
Exception {
         try (var clusterClient = getClusterClient(conf)) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index f77ccba5..19d50587 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -113,6 +113,7 @@ import 
io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import lombok.SneakyThrows;
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -334,15 +335,7 @@ public class AbstractFlinkServiceTest {
     @ValueSource(ints = {404, 409, 500})
     public void cancelErrorHandling(int statusCode) throws Exception {
 
-        var testingClusterClient =
-                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
-        testingClusterClient.setCancelFunction(
-                jobID ->
-                        CompletableFuture.failedFuture(
-                                new RuntimeException(
-                                        new RestClientException(
-                                                "errrr", 
HttpResponseStatus.valueOf(statusCode)))));
-        var flinkService = new TestingService(testingClusterClient);
+        var flinkService = getTestingService("errrr", 
HttpResponseStatus.valueOf(statusCode));
 
         JobID jobID = JobID.generate();
         var job = TestUtils.buildSessionJob();
@@ -360,10 +353,104 @@ public class AbstractFlinkServiceTest {
             assertEquals(RUNNING, jobStatus.getState());
         } else {
             flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new 
Configuration());
-            assertEquals(CANCELLING, jobStatus.getState());
+            assertEquals(FINISHED, jobStatus.getState());
+            assertNull(jobStatus.getJobId());
         }
     }
 
+    @Test
+    public void cancelErrorHandlingWithTerminalStateMessage() throws Exception 
{
+        var flinkService =
+                getTestingService(
+                        "Job cancellation failed because the job has already 
reached another terminal state (FAILED).",
+                        HttpResponseStatus.BAD_REQUEST);
+
+        JobID jobID = JobID.generate();
+        var job = TestUtils.buildSessionJob();
+        var jobStatus = job.getStatus().getJobStatus();
+        jobStatus.setJobId(jobID.toHexString());
+        jobStatus.setState(RUNNING);
+        ReconciliationUtils.updateStatusForDeployedSpec(job, new 
Configuration());
+
+        flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new 
Configuration());
+        assertEquals(FINISHED, jobStatus.getState());
+        assertNull(jobStatus.getJobId());
+    }
+
+    /**
+     * Reproduces the operator-upgrade scenario for Session Mode with CANCEL 
upgrade mode: when a
+     * running session job's JobManager has already moved the job into a 
terminal state (e.g.
+     * FAILED) and the operator (after a restart/upgrade) tries to cancel it, 
the cancellation
+     * request comes back with "already reached another terminal state". 
Previously this caused the
+     * finalizer to never be removed, leaving the CR stuck in Terminating.
+     */
+    @Test
+    public void cancelSessionJobWithCancelModeAndTerminalStateMessage() throws 
Exception {
+        var flinkService =
+                getTestingService(
+                        "Job cancellation failed because the job has already 
reached another terminal state (FAILED).",
+                        HttpResponseStatus.BAD_REQUEST);
+
+        JobID jobID = JobID.generate();
+        var job = TestUtils.buildSessionJob();
+        var jobStatus = job.getStatus().getJobStatus();
+        jobStatus.setJobId(jobID.toHexString());
+        jobStatus.setState(RUNNING);
+        ReconciliationUtils.updateStatusForDeployedSpec(job, new 
Configuration());
+
+        var result = flinkService.cancelSessionJob(job, SuspendMode.CANCEL, 
new Configuration());
+        // Must NOT be pending — the CR would otherwise be stuck in 
Terminating indefinitely
+        assertFalse(result.isPending());
+        assertEquals(FINISHED, jobStatus.getState());
+        assertNull(jobStatus.getJobId());
+    }
+
+    @NotNull
+    private TestingService getTestingService(String message, 
HttpResponseStatus badRequest)
+            throws Exception {
+        final var testingClusterClient =
+                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
+        testingClusterClient.setCancelFunction(
+                jobID ->
+                        CompletableFuture.failedFuture(
+                                new RuntimeException(
+                                        new RestClientException(message, 
badRequest))));
+        return new TestingService(testingClusterClient);
+    }
+
+    /**
+     * Reproduces FLINK-37766 for Application Mode: when a running application 
job's JobManager has
+     * moved the job to a terminal state (e.g. FAILED due to HA desync) and 
the operator tries to
+     * cancel the job with CANCEL suspend mode (used for last-state upgrades), 
the "already reached
+     * another terminal state" response previously caused the operator to 
always return
+     * CancelResult.pending(), looping forever without completing the 
upgrade/deletion.
+     */
+    @Test
+    public void cancelApplicationJobWithCancelModeAndTerminalStateMessage() 
throws Exception {
+        var flinkService =
+                getTestingService(
+                        "Job cancellation failed because the job has already 
reached another terminal state (FAILED).",
+                        HttpResponseStatus.BAD_REQUEST);
+
+        JobID jobID = JobID.generate();
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+        JobStatus jobStatus = deployment.getStatus().getJobStatus();
+        jobStatus.setJobId(jobID.toHexString());
+        jobStatus.setState(RUNNING);
+        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
+
+        var result =
+                flinkService.cancelJob(
+                        deployment,
+                        SuspendMode.CANCEL,
+                        configManager.getObserveConfig(deployment),
+                        false);
+        // Must NOT be pending — the operator would otherwise loop forever on 
the upgrade
+        assertFalse(result.isPending());
+        assertEquals(FINISHED, jobStatus.getState());
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void cancelJobWithSavepointUpgradeModeTest(boolean 
deleteAfterSavepoint)

Reply via email to