This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 32466300 [FLINK-39507] Terminal jobs should never be restarted by 
cluster/job health check
32466300 is described below

commit 324663004226b1635ff6248c2e60267ba64e4a57
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Apr 27 17:35:49 2026 +0200

    [FLINK-39507] Terminal jobs should never be restarted by cluster/job health 
check
---
 .../observer/deployment/ApplicationObserver.java   |  4 +-
 .../deployment/ApplicationReconciler.java          | 20 ++++--
 .../controller/UnhealthyDeploymentRestartTest.java | 80 ++++++++++++++++++++++
 3 files changed, 97 insertions(+), 7 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index 8f12183d..d228e72e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.observer.ClusterHealthObserver;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.observer.SnapshotObserver;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
@@ -50,7 +51,8 @@ public class ApplicationObserver extends 
AbstractFlinkDeploymentObserver {
             var observeConfig = ctx.getObserveConfig();
             savepointObserver.observeSavepointStatus(ctx);
             savepointObserver.observeCheckpointStatus(ctx);
-            if 
(observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
+            if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)
+                    && 
!ReconciliationUtils.isJobInTerminalState(ctx.getResource().getStatus())) {
                 clusterHealthObserver.observe(ctx);
             }
         }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index ce8baca5..6f0f0e85 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -279,7 +279,7 @@ public class ApplicationReconciler
                 ClusterHealthEvaluator.getLastValidClusterHealthInfo(
                         deployment.getStatus().getClusterInfo());
         boolean shouldRestartJobBecauseUnhealthy =
-                shouldRestartJobBecauseUnhealthy(deployment, observeConfig);
+                shouldRestartJobBecauseUnhealthy(ctx, observeConfig);
         boolean shouldRecoverDeployment = 
shouldRecoverDeployment(observeConfig, deployment);
         if (shouldRestartJobBecauseUnhealthy || shouldRecoverDeployment) {
             if (shouldRecoverDeployment) {
@@ -316,7 +316,15 @@ public class ApplicationReconciler
     }
 
     private boolean shouldRestartJobBecauseUnhealthy(
-            FlinkDeployment deployment, Configuration observeConfig) {
+            FlinkResourceContext<FlinkDeployment> ctx, Configuration 
observeConfig) {
+        var deployment = ctx.getResource();
+        // Terminal jobs (FAILED, FINISHED, CANCELED) must not be restarted 
via the cluster health
+        // check. Restarting a FAILED job is controlled exclusively by 
OPERATOR_JOB_RESTART_FAILED;
+        // a terminal job also has no HA metadata, so a health-based restart 
would always fail.
+        if (ReconciliationUtils.isJobInTerminalState(deployment.getStatus())) {
+            return false;
+        }
+
         boolean restartNeeded = false;
 
         if (observeConfig.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
@@ -329,13 +337,13 @@ public class ApplicationReconciler
                     if (deployment.getSpec().getJob().getUpgradeMode() == 
UpgradeMode.STATELESS) {
                         LOG.debug("Stateless job, recovering unhealthy 
jobmanager deployment");
                         restartNeeded = true;
-                    } else if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(
-                            observeConfig)) {
-                        LOG.debug("HA is enabled, recovering unhealthy 
jobmanager deployment");
+                    } else if 
(ctx.getFlinkService().isHaMetadataAvailable(observeConfig)) {
+                        LOG.debug(
+                                "HA metadata available, recovering unhealthy 
jobmanager deployment");
                         restartNeeded = true;
                     } else {
                         LOG.warn(
-                                "Could not recover unhealthy jobmanager 
deployment without HA enabled");
+                                "Could not recover unhealthy jobmanager 
deployment, HA metadata not available");
                     }
 
                     if (restartNeeded) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index e19089c5..5f706b61 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -32,10 +32,12 @@ import 
io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
 
+import static org.apache.flink.api.common.JobStatus.FAILED;
 import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
@@ -156,4 +158,82 @@ public class UnhealthyDeploymentRestartTest {
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
     }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes")
+    public void verifyTerminallyFailedJobNotRestartedByHealthCheck(
+            FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws 
Exception {
+        FlinkDeployment appCluster = 
TestUtils.buildApplicationCluster(flinkVersion);
+        appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
+
+        // Start a healthy deployment
+        flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "0");
+        
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
+
+        // Mark job as terminally FAILED
+        flinkService.markApplicationJobFailedWithError(
+                flinkService.listJobs().get(0).f1.getJobId(), "Terminal 
failure");
+
+        // Age the checkpoint health data to simulate an unhealthy evaluation
+        // (no checkpoint progress within the window), which would normally 
trigger a restart
+        ClusterHealthInfo clusterHealthInfo =
+                
getLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo());
+        clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+                
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 1200000);
+        setLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo(), 
clusterHealthInfo);
+        testController.getStatusRecorder().patchAndCacheStatus(appCluster, 
kubernetesClient);
+
+        // Reconcile - FAILED terminal job must NOT be restarted via the 
health-check codepath.
+        // The health-based restart path requires HA metadata which a 
terminated job does not have,
+        // and restarting a terminal job is controlled exclusively by 
OPERATOR_JOB_RESTART_FAILED.
+        testController.reconcile(appCluster, context);
+
+        assertEquals(FAILED, appCluster.getStatus().getJobStatus().getState());
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+    }
+
+    /**
+     * For stateful (LAST_STATE / SAVEPOINT) upgrade modes, a health-based 
restart must NOT be
+     * triggered when HA metadata is absent. Without HA metadata the restart 
would immediately fail
+     * with an UpgradeFailureException, so the check must be skipped entirely.
+     */
+    @ParameterizedTest
+    @EnumSource(
+            value = UpgradeMode.class,
+            names = {"LAST_STATE", "SAVEPOINT"})
+    public void verifyUnhealthyRestartSkippedWhenHaMetadataAbsent(UpgradeMode 
upgradeMode)
+            throws Exception {
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
+
+        // Start a healthy deployment (HA metadata available by default)
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
+
+        // Simulate unhealthy cluster (restart count exceeds threshold) while 
HA metadata is absent
+        flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "100");
+        flinkService.setHaDataAvailable(false);
+
+        testController.reconcile(appCluster, context);
+
+        // Health-based restart must NOT be triggered when HA metadata is 
absent
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+    }
 }

Reply via email to