This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 32466300 [FLINK-39507] Terminal jobs should never be restarted by
cluster/job health check
32466300 is described below
commit 324663004226b1635ff6248c2e60267ba64e4a57
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Apr 27 17:35:49 2026 +0200
[FLINK-39507] Terminal jobs should never be restarted by cluster/job health
check
---
.../observer/deployment/ApplicationObserver.java | 4 +-
.../deployment/ApplicationReconciler.java | 20 ++++--
.../controller/UnhealthyDeploymentRestartTest.java | 80 ++++++++++++++++++++++
3 files changed, 97 insertions(+), 7 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index 8f12183d..d228e72e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -23,6 +23,7 @@ import
org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthObserver;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.SnapshotObserver;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
@@ -50,7 +51,8 @@ public class ApplicationObserver extends
AbstractFlinkDeploymentObserver {
var observeConfig = ctx.getObserveConfig();
savepointObserver.observeSavepointStatus(ctx);
savepointObserver.observeCheckpointStatus(ctx);
- if
(observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
+ if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)
+ &&
!ReconciliationUtils.isJobInTerminalState(ctx.getResource().getStatus())) {
clusterHealthObserver.observe(ctx);
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index ce8baca5..6f0f0e85 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -279,7 +279,7 @@ public class ApplicationReconciler
ClusterHealthEvaluator.getLastValidClusterHealthInfo(
deployment.getStatus().getClusterInfo());
boolean shouldRestartJobBecauseUnhealthy =
- shouldRestartJobBecauseUnhealthy(deployment, observeConfig);
+ shouldRestartJobBecauseUnhealthy(ctx, observeConfig);
boolean shouldRecoverDeployment =
shouldRecoverDeployment(observeConfig, deployment);
if (shouldRestartJobBecauseUnhealthy || shouldRecoverDeployment) {
if (shouldRecoverDeployment) {
@@ -316,7 +316,15 @@ public class ApplicationReconciler
}
private boolean shouldRestartJobBecauseUnhealthy(
- FlinkDeployment deployment, Configuration observeConfig) {
+ FlinkResourceContext<FlinkDeployment> ctx, Configuration
observeConfig) {
+ var deployment = ctx.getResource();
+ // Terminal jobs (FAILED, FINISHED, CANCELED) must not be restarted
via the cluster health
+ // check. Restarting a FAILED job is controlled exclusively by
OPERATOR_JOB_RESTART_FAILED;
+ // a terminal job also has no HA metadata, so a health-based restart
would always fail.
+ if (ReconciliationUtils.isJobInTerminalState(deployment.getStatus())) {
+ return false;
+ }
+
boolean restartNeeded = false;
if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
@@ -329,13 +337,13 @@ public class ApplicationReconciler
if (deployment.getSpec().getJob().getUpgradeMode() ==
UpgradeMode.STATELESS) {
LOG.debug("Stateless job, recovering unhealthy
jobmanager deployment");
restartNeeded = true;
- } else if
(HighAvailabilityMode.isHighAvailabilityModeActivated(
- observeConfig)) {
- LOG.debug("HA is enabled, recovering unhealthy
jobmanager deployment");
+ } else if
(ctx.getFlinkService().isHaMetadataAvailable(observeConfig)) {
+ LOG.debug(
+ "HA metadata available, recovering unhealthy
jobmanager deployment");
restartNeeded = true;
} else {
LOG.warn(
- "Could not recover unhealthy jobmanager
deployment without HA enabled");
+ "Could not recover unhealthy jobmanager
deployment, HA metadata not available");
}
if (restartNeeded) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index e19089c5..5f706b61 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -32,10 +32,12 @@ import
io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
+import static org.apache.flink.api.common.JobStatus.FAILED;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
@@ -156,4 +158,82 @@ public class UnhealthyDeploymentRestartTest {
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
}
+
+ @ParameterizedTest
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes")
+ public void verifyTerminallyFailedJobNotRestartedByHealthCheck(
+ FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws
Exception {
+ FlinkDeployment appCluster =
TestUtils.buildApplicationCluster(flinkVersion);
+ appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
+
+ // Start a healthy deployment
+ flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "0");
+
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
+
+ // Mark job as terminally FAILED
+ flinkService.markApplicationJobFailedWithError(
+ flinkService.listJobs().get(0).f1.getJobId(), "Terminal
failure");
+
+ // Age the checkpoint health data to simulate an unhealthy evaluation
+ // (no checkpoint progress within the window), which would normally
trigger a restart
+ ClusterHealthInfo clusterHealthInfo =
+
getLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo());
+ clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 1200000);
+ setLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo(),
clusterHealthInfo);
+ testController.getStatusRecorder().patchAndCacheStatus(appCluster,
kubernetesClient);
+
+ // Reconcile - FAILED terminal job must NOT be restarted via the
health-check codepath.
+ // The health-based restart path requires HA metadata which a
terminated job does not have,
+ // and restarting a terminal job is controlled exclusively by
OPERATOR_JOB_RESTART_FAILED.
+ testController.reconcile(appCluster, context);
+
+ assertEquals(FAILED, appCluster.getStatus().getJobStatus().getState());
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ }
+
+ /**
+ * For stateful (LAST_STATE / SAVEPOINT) upgrade modes, a health-based
restart must NOT be
+ * triggered when HA metadata is absent. Without HA metadata the restart
would immediately fail
+ * with an UpgradeFailureException, so the check must be skipped entirely.
+ */
+ @ParameterizedTest
+ @EnumSource(
+ value = UpgradeMode.class,
+ names = {"LAST_STATE", "SAVEPOINT"})
+ public void verifyUnhealthyRestartSkippedWhenHaMetadataAbsent(UpgradeMode
upgradeMode)
+ throws Exception {
+ FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+ appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
+
+ // Start a healthy deployment (HA metadata available by default)
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
+
+ // Simulate unhealthy cluster (restart count exceeds threshold) while
HA metadata is absent
+ flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "100");
+ flinkService.setHaDataAvailable(false);
+
+ testController.reconcile(appCluster, context);
+
+ // Health-based restart must NOT be triggered when HA metadata is
absent
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ }
}