This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 90a7fafa [FLINK-37766] FlinkSessionJob deletion blocked by finalizer
90a7fafa is described below
commit 90a7fafabc8d561bff1f66f8ef69b92ca2e4d71b
Author: Prashant Khanal <[email protected]>
AuthorDate: Tue Apr 28 14:35:43 2026 +0200
[FLINK-37766] FlinkSessionJob deletion blocked by finalizer
---
.../operator/service/AbstractFlinkService.java | 50 ++++++++--
.../operator/service/AbstractFlinkServiceTest.java | 107 +++++++++++++++++++--
2 files changed, 138 insertions(+), 19 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index fbbd871b..543b937f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -345,9 +345,11 @@ public abstract class AbstractFlinkService implements
FlinkService {
}
break;
case CANCEL:
- cancelJobOrError(clusterClient, status, false);
- // This is async we need to return
- return CancelResult.pending();
+ if (!cancelJobOrError(clusterClient, status, false)) {
+ // This is async we need to return
+ return CancelResult.pending();
+ }
+ break;
}
}
if (suspendMode.deleteCluster() || deleteCluster) {
@@ -370,9 +372,12 @@ public abstract class AbstractFlinkService implements
FlinkService {
switch (suspendMode) {
case STATELESS:
case CANCEL:
- cancelJobOrError(clusterClient, status, suspendMode ==
SuspendMode.STATELESS);
- // This is async we need to return and re-observe
- return CancelResult.pending();
+ if (!cancelJobOrError(
+ clusterClient, status, suspendMode ==
SuspendMode.STATELESS)) {
+ // This is async we need to return and re-observe
+ return CancelResult.pending();
+ }
+ break;
case SAVEPOINT:
savepointPath = savepointJobOrError(clusterClient, status,
conf);
break;
@@ -383,14 +388,30 @@ public abstract class AbstractFlinkService implements
FlinkService {
return CancelResult.completed(savepointPath);
}
- public void cancelJobOrError(
+ /**
+ * Attempts to cancel a Flink job given its current status. If the job is
already in the process
+ * of being cancelled or has been terminated, the method handles these
cases accordingly.
+ *
+ * @param clusterClient the {@code RestClusterClient} instance used to
interact with the Flink
+ * cluster.
+ * @param status the current status of the job, encapsulated in a {@code
CommonStatus} object.
+ * @param ignoreMissing a flag indicating whether the absence of the job
should be ignored. If
+ * {@code true}, the method will return {@code true} when the job is
missing. If {@code
+ * false}, an exception will be thrown when the job cannot be found.
+ * @return {@code true} if the job was already missing or terminated, and
no further action is
+ * needed. {@code false} if cancellation was successfully initiated
and is still pending
+ * (the caller should await completion before proceeding).
+ * @throws UpgradeFailureException if the job cannot be cancelled due to
an unexpected error, or
+ * if the job is missing and {@code ignoreMissing} is set to {@code
false}.
+ */
+ public boolean cancelJobOrError(
RestClusterClient<String> clusterClient,
CommonStatus<?> status,
boolean ignoreMissing) {
var jobID = JobID.fromHexString(status.getJobStatus().getJobId());
if (ReconciliationUtils.isJobCancelling(status)) {
LOG.info("Job already cancelling");
- return;
+ return false;
}
LOG.info("Cancelling job");
try {
@@ -402,6 +423,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
if (isJobMissing(e)) {
if (ignoreMissing) {
LOG.info("Job already missing");
+ return true;
} else {
throw new UpgradeFailureException(
"Cannot find job when trying to cancel",
@@ -410,6 +432,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
}
} else if (isJobTerminated(e)) {
LOG.info("Job already terminated");
+ return true;
} else {
LOG.warn("Error while cancelling job", e);
throw new UpgradeFailureException(
@@ -417,6 +440,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
}
}
status.getJobStatus().setState(JobStatus.CANCELLING);
+ return false;
}
public String savepointJobOrError(
@@ -487,9 +511,16 @@ public abstract class AbstractFlinkService implements
FlinkService {
return true;
}
- return findThrowable(e, RestClientException.class)
+ if (findThrowable(e, RestClientException.class)
.map(RestClientException::getHttpResponseStatus)
.map(respCode -> HttpResponseStatus.CONFLICT == respCode)
+ .orElse(false)) {
+ return true;
+ }
+
+ return Optional.ofNullable(ExceptionUtils.getExceptionMessage(e))
+ .map(String::toLowerCase)
+ .map(msg -> msg.contains("already reached another terminal
state"))
.orElse(false);
}
@@ -1051,6 +1082,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
}
}
+ @Override
public Map<String, String> getMetrics(
Configuration conf, String jobId, List<String> metricNames) throws
Exception {
try (var clusterClient = getClusterClient(conf)) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index f77ccba5..19d50587 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -113,6 +113,7 @@ import
io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import lombok.SneakyThrows;
+import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -334,15 +335,7 @@ public class AbstractFlinkServiceTest {
@ValueSource(ints = {404, 409, 500})
public void cancelErrorHandling(int statusCode) throws Exception {
- var testingClusterClient =
- new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
- testingClusterClient.setCancelFunction(
- jobID ->
- CompletableFuture.failedFuture(
- new RuntimeException(
- new RestClientException(
- "errrr",
HttpResponseStatus.valueOf(statusCode)))));
- var flinkService = new TestingService(testingClusterClient);
+ var flinkService = getTestingService("errrr",
HttpResponseStatus.valueOf(statusCode));
JobID jobID = JobID.generate();
var job = TestUtils.buildSessionJob();
@@ -360,10 +353,104 @@ public class AbstractFlinkServiceTest {
assertEquals(RUNNING, jobStatus.getState());
} else {
flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new
Configuration());
- assertEquals(CANCELLING, jobStatus.getState());
+ assertEquals(FINISHED, jobStatus.getState());
+ assertNull(jobStatus.getJobId());
}
}
+ @Test
+ public void cancelErrorHandlingWithTerminalStateMessage() throws Exception
{
+ var flinkService =
+ getTestingService(
+ "Job cancellation failed because the job has already
reached another terminal state (FAILED).",
+ HttpResponseStatus.BAD_REQUEST);
+
+ JobID jobID = JobID.generate();
+ var job = TestUtils.buildSessionJob();
+ var jobStatus = job.getStatus().getJobStatus();
+ jobStatus.setJobId(jobID.toHexString());
+ jobStatus.setState(RUNNING);
+ ReconciliationUtils.updateStatusForDeployedSpec(job, new
Configuration());
+
+ flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new
Configuration());
+ assertEquals(FINISHED, jobStatus.getState());
+ assertNull(jobStatus.getJobId());
+ }
+
+ /**
+ * Reproduces the operator-upgrade scenario for Session Mode with CANCEL
upgrade mode: when a
+ * running session job's JobManager has already moved the job into a
terminal state (e.g.
+ * FAILED) and the operator (after a restart/upgrade) tries to cancel it,
the cancellation
+ * request comes back with "already reached another terminal state".
Previously this caused the
+ * finalizer to never be removed, leaving the CR stuck in Terminating.
+ */
+ @Test
+ public void cancelSessionJobWithCancelModeAndTerminalStateMessage() throws
Exception {
+ var flinkService =
+ getTestingService(
+ "Job cancellation failed because the job has already
reached another terminal state (FAILED).",
+ HttpResponseStatus.BAD_REQUEST);
+
+ JobID jobID = JobID.generate();
+ var job = TestUtils.buildSessionJob();
+ var jobStatus = job.getStatus().getJobStatus();
+ jobStatus.setJobId(jobID.toHexString());
+ jobStatus.setState(RUNNING);
+ ReconciliationUtils.updateStatusForDeployedSpec(job, new
Configuration());
+
+ var result = flinkService.cancelSessionJob(job, SuspendMode.CANCEL,
new Configuration());
+ // Must NOT be pending — the CR would otherwise be stuck in
Terminating indefinitely
+ assertFalse(result.isPending());
+ assertEquals(FINISHED, jobStatus.getState());
+ assertNull(jobStatus.getJobId());
+ }
+
+ @NotNull
+ private TestingService getTestingService(String message,
HttpResponseStatus badRequest)
+ throws Exception {
+ final var testingClusterClient =
+ new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
+ testingClusterClient.setCancelFunction(
+ jobID ->
+ CompletableFuture.failedFuture(
+ new RuntimeException(
+ new RestClientException(message,
badRequest))));
+ return new TestingService(testingClusterClient);
+ }
+
+ /**
+ * Reproduces FLINK-37766 for Application Mode: when a running application
job's JobManager has
+ * moved the job to a terminal state (e.g. FAILED due to HA desync) and
the operator tries to
+ * cancel the job with CANCEL suspend mode (used for last-state upgrades),
the "already reached
+ * another terminal state" response previously caused the operator to
always return
+ * CancelResult.pending(), looping forever without completing the
upgrade/deletion.
+ */
+ @Test
+ public void cancelApplicationJobWithCancelModeAndTerminalStateMessage()
throws Exception {
+ var flinkService =
+ getTestingService(
+ "Job cancellation failed because the job has already
reached another terminal state (FAILED).",
+ HttpResponseStatus.BAD_REQUEST);
+
+ JobID jobID = JobID.generate();
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+ JobStatus jobStatus = deployment.getStatus().getJobStatus();
+ jobStatus.setJobId(jobID.toHexString());
+ jobStatus.setState(RUNNING);
+ ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
+
+ var result =
+ flinkService.cancelJob(
+ deployment,
+ SuspendMode.CANCEL,
+ configManager.getObserveConfig(deployment),
+ false);
+ // Must NOT be pending — the operator would otherwise loop forever on
the upgrade
+ assertFalse(result.isPending());
+ assertEquals(FINISHED, jobStatus.getState());
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void cancelJobWithSavepointUpgradeModeTest(boolean
deleteAfterSavepoint)