This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 824d96fa [FLINK-38077] Make sure jobmanager is accessible when trying
to cancel for suspend/upgrade
824d96fa is described below
commit 824d96facaafd61d15b92ae1bf80ab8e76b80a85
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Jul 10 12:46:06 2025 +0200
[FLINK-38077] Make sure jobmanager is accessible when trying to cancel for
suspend/upgrade
---
.../operator/api/status/CommonStatus.java | 12 ++++++
.../operator/api/status/FlinkDeploymentStatus.java | 7 ++++
.../api/status/JobManagerDeploymentStatus.java | 4 ++
.../operator/reconciler/ReconciliationUtils.java | 4 --
.../deployment/AbstractJobReconciler.java | 4 +-
.../deployment/ApplicationReconciler.java | 7 ++--
.../operator/service/AbstractFlinkService.java | 2 +-
.../ApplicationReconcilerUpgradeModeTest.java | 45 ++++++++++++++++------
8 files changed, 63 insertions(+), 22 deletions(-)
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 30649e89..0de792da 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -22,6 +22,7 @@ import
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import io.fabric8.crd.generator.annotation.PrinterColumn;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -29,6 +30,8 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.StringUtils;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+
/** Last observed common status of the Flink deployment/Flink SessionJob. */
@Experimental
@Data
@@ -121,4 +124,13 @@ public abstract class CommonStatus<SPEC extends
AbstractFlinkSpec> {
return ResourceLifecycleState.DEPLOYED;
}
+
+ @JsonIgnore
+ public boolean isJobCancellable() {
+ var jobState = jobStatus.getState();
+ if (jobState == null) {
+ return false;
+ }
+ return RECONCILING != jobState && !jobState.isGloballyTerminalState();
+ }
}
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
index 136d3415..9a341299 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.api.status;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -55,4 +56,10 @@ public class FlinkDeploymentStatus extends
CommonStatus<FlinkDeploymentSpec> {
/** Information about the TaskManagers for the scale subresource. */
private TaskManagerInfo taskManager;
+
+ @JsonIgnore
+ @Override
+ public boolean isJobCancellable() {
+ return super.isJobCancellable() &&
jobManagerDeploymentStatus.isRestApiAvailable();
+ }
}
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
index 54a0181b..2c86c49a 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
@@ -35,4 +35,8 @@ public enum JobManagerDeploymentStatus {
/** Deployment in terminal error, requires spec change for reconciliation
to continue. */
ERROR;
+
+ public boolean isRestApiAvailable() {
+ return this == READY;
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 2936501a..94140c3f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -379,10 +379,6 @@ public class ReconciliationUtils {
return CANCELED == status.getJobStatus().getState();
}
- public static boolean isJobCancellable(CommonStatus<?> status) {
- return RECONCILING != status.getJobStatus().getState();
- }
-
public static boolean isJobCancelling(CommonStatus<?> status) {
return status.getJobStatus() != null && CANCELLING ==
status.getJobStatus().getState();
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 3fcf8e46..9a4480d0 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -277,7 +277,7 @@ public abstract class AbstractJobReconciler<
if (running && savepointPossible) {
LOG.info("Using savepoint to upgrade Flink version");
return JobUpgrade.savepoint(false);
- } else if
(ReconciliationUtils.isJobCancellable(resource.getStatus())) {
+ } else if (resource.getStatus().isJobCancellable()) {
LOG.info("Using last-state upgrade with cancellation to
upgrade Flink version");
return JobUpgrade.lastStateUsingCancel();
} else {
@@ -354,7 +354,7 @@ public abstract class AbstractJobReconciler<
private boolean allowLastStateCancel(FlinkResourceContext<CR> ctx) {
var resource = ctx.getResource();
- if (!ReconciliationUtils.isJobCancellable(resource.getStatus())) {
+ if (!resource.getStatus().isJobCancellable()) {
return false;
}
if (resource instanceof FlinkSessionJob) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 180fb128..ce8baca5 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -88,12 +88,13 @@ public class ApplicationReconciler
var status = deployment.getStatus();
var availableUpgradeMode = super.getJobUpgrade(ctx, deployConfig);
- if (availableUpgradeMode.isAvailable() ||
!availableUpgradeMode.isAllowFallback()) {
+ if (availableUpgradeMode.isAvailable()) {
return availableUpgradeMode;
}
var flinkService = ctx.getFlinkService();
- if (HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
+ if (availableUpgradeMode.isAllowFallback()
+ &&
HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
&&
HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
&& flinkService.isHaMetadataAvailable(deployConfig)) {
LOG.info(
@@ -125,7 +126,7 @@ public class ApplicationReconciler
"UpgradeFailed");
}
- return JobUpgrade.unavailable();
+ return availableUpgradeMode;
}
private void deleteJmThatNeverStarted(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index f0a7b25b..18fc3690 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -336,7 +336,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
savepointPath = savepointJobOrError(clusterClient, status,
conf);
break;
case STATELESS:
- if (ReconciliationUtils.isJobCancellable(status)) {
+ if (status.isJobCancellable()) {
try {
cancelJobOrError(clusterClient, status, true);
} catch (Exception ex) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 3b50af2a..c9f70586 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -72,6 +72,7 @@ import static
org.apache.flink.api.common.JobStatus.RESTARTING;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -501,6 +502,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
@ValueSource(booleans = {true, false})
public void testLastStateMaxCheckpointAge(boolean cancellable) throws
Exception {
var deployment = TestUtils.buildApplicationCluster();
+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
deployment
.getSpec()
.getFlinkConfiguration()
@@ -643,6 +645,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
jobStatus.setJobId(new JobID().toString());
// Running state, savepoint if possible
+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
jobStatus.setState(RUNNING);
var ctx = getResourceContext(deployment);
var deployConf = ctx.getDeployConfig(deployment.getSpec());
@@ -654,6 +657,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
jobReconciler.getJobUpgrade(ctx, deployConf));
// Not running (but cancellable)
+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
jobStatus.setState(RESTARTING);
assertEquals(
AbstractJobReconciler.JobUpgrade.lastStateUsingCancel(),
@@ -667,17 +671,26 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
}
private static Stream<Arguments> testLastStateCancelParams() {
- return Stream.of(
- Arguments.of(UpgradeMode.LAST_STATE, true),
- Arguments.of(UpgradeMode.LAST_STATE, false),
- Arguments.of(UpgradeMode.SAVEPOINT, true),
- Arguments.of(UpgradeMode.SAVEPOINT, false));
+ var out = new ArrayList<Arguments>();
+ for (var upgradeMode : List.of(UpgradeMode.SAVEPOINT,
UpgradeMode.LAST_STATE)) {
+ for (boolean allowFallback : List.of(true, false)) {
+ for (var jmStatus : JobManagerDeploymentStatus.values()) {
+ out.add(Arguments.of(upgradeMode, allowFallback,
jmStatus));
+ }
+ }
+ }
+ return out.stream();
}
@ParameterizedTest
@MethodSource("testLastStateCancelParams")
- public void testLastStateNoHaMeta(UpgradeMode upgradeMode, boolean
allowFallback)
+ public void testLastStateNoHaMeta(
+ UpgradeMode upgradeMode, boolean allowFallback,
JobManagerDeploymentStatus jmStatus)
throws Exception {
+ if (upgradeMode == UpgradeMode.LAST_STATE && !allowFallback) {
+ // This cannot happen
+ return;
+ }
var jobReconciler = (ApplicationReconciler)
this.reconciler.getReconciler();
var deployment = TestUtils.buildApplicationCluster();
deployment
@@ -702,6 +715,8 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
// Set job status to running
var jobStatus = deployment.getStatus().getJobStatus();
+ deployment.getStatus().setJobManagerDeploymentStatus(jmStatus);
+
long now = System.currentTimeMillis();
jobStatus.setStartTime(Long.toString(now));
@@ -712,15 +727,21 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
var ctx = getResourceContext(deployment);
var deployConf = ctx.getDeployConfig(deployment.getSpec());
- if (upgradeMode == UpgradeMode.LAST_STATE) {
- assertEquals(
- AbstractJobReconciler.JobUpgrade.lastStateUsingCancel(),
- jobReconciler.getJobUpgrade(ctx, deployConf));
+ if (List.of(JobManagerDeploymentStatus.ERROR,
JobManagerDeploymentStatus.MISSING)
+ .contains(jmStatus)) {
+ assertThatThrownBy(() -> jobReconciler.getJobUpgrade(ctx,
deployConf))
+ .isInstanceOf(UpgradeFailureException.class)
+ .hasMessageContaining(
+ "JobManager deployment is missing and HA metadata
is not available");
} else {
+ boolean immediatelyCancellable =
+ (upgradeMode == UpgradeMode.LAST_STATE || allowFallback)
+ && jmStatus == JobManagerDeploymentStatus.READY;
assertEquals(
- allowFallback
+ immediatelyCancellable
?
AbstractJobReconciler.JobUpgrade.lastStateUsingCancel()
- :
AbstractJobReconciler.JobUpgrade.pendingUpgrade(),
+ : new AbstractJobReconciler.JobUpgrade(
+ null, null, false, allowFallback, true),
jobReconciler.getJobUpgrade(ctx, deployConf));
}
}