This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c944403 [FLINK-31630] Limit max checkpoint age for last-state upgrade
4c944403 is described below

commit 4c944403f466e36c151c356c9f30d8dd6074fa30
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Mon Mar 27 15:20:38 2023 +0200

    [FLINK-31630] Limit max checkpoint age for last-state upgrade
---
 .../shortcodes/generated/dynamic_section.html      |   6 ++
 .../kubernetes_operator_config_configuration.html  |   6 ++
 .../kubernetes/operator/api/status/Savepoint.java  |   4 +
 .../config/KubernetesOperatorConfigOptions.java    |   8 ++
 .../deployment/AbstractJobReconciler.java          | 104 +++++++++++++++---
 .../deployment/ApplicationReconciler.java          |  11 +-
 .../operator/service/AbstractFlinkService.java     |  41 ++++---
 .../operator/service/CheckpointHistoryWrapper.java |  72 +++++++++++--
 .../kubernetes/operator/service/FlinkService.java  |   6 ++
 .../operator/validation/DefaultValidator.java      |   8 ++
 .../kubernetes/operator/TestingFlinkService.java   | 119 +++++++++------------
 .../controller/FlinkDeploymentControllerTest.java  |   4 +-
 .../deployment/ApplicationObserverTest.java        |   2 +-
 .../ApplicationReconcilerUpgradeModeTest.java      | 105 ++++++++++++++++++
 .../operator/validation/DefaultValidatorTest.java  |  13 +++
 15 files changed, 401 insertions(+), 108 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index ec44c375..3179183f 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -80,6 +80,12 @@
             <td>Boolean</td>
             <td>Enables last-state fallback for savepoint upgrade mode. When 
the job is not running thus savepoint cannot be triggered but HA metadata is 
available for last state restore the operator can initiate the upgrade process 
when the flag is enabled.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Max allowed checkpoint age for initiating last-state upgrades 
on running jobs. If a checkpoint is not available within the desired age (and 
nothing in progress) a savepoint will be triggered.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
             <td style="word-wrap: break-word;">0 ms</td>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 6b335b8d..1b792d02 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -164,6 +164,12 @@
             <td>Boolean</td>
             <td>Enables last-state fallback for savepoint upgrade mode. When 
the job is not running thus savepoint cannot be triggered but HA metadata is 
available for last state restore the operator can initiate the upgrade process 
when the flag is enabled.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Max allowed checkpoint age for initiating last-state upgrades 
on running jobs. If a checkpoint is not available within the desired age (and 
nothing in progress) a savepoint will be triggered.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.label.selector</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/Savepoint.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/Savepoint.java
index e754a1cf..58eca4e2 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/Savepoint.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/Savepoint.java
@@ -64,6 +64,10 @@ public class Savepoint {
         this.triggerNonce = triggerNonce;
     }
 
+    public static Savepoint of(String location, long timeStamp, 
SavepointTriggerType triggerType) {
+        return new Savepoint(timeStamp, location, triggerType, 
SavepointFormatType.UNKNOWN, null);
+    }
+
     public static Savepoint of(String location, SavepointTriggerType 
triggerType) {
         return new Savepoint(
                 System.currentTimeMillis(),
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index f361310b..820eb221 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -326,6 +326,14 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "Enables last-state fallback for savepoint upgrade 
mode. When the job is not running thus savepoint cannot be triggered but HA 
metadata is available for last state restore the operator can initiate the 
upgrade process when the flag is enabled.");
 
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Duration> 
OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE =
+            operatorConfig("job.upgrade.last-state.max.allowed.checkpoint.age")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Max allowed checkpoint age for initiating 
last-state upgrades on running jobs. If a checkpoint is not available within 
the desired age (and nothing in progress) a savepoint will be triggered.");
+
     @Documentation.Section(SECTION_DYNAMIC)
     public static final ConfigOption<SavepointFormatType> 
OPERATOR_SAVEPOINT_FORMAT_TYPE =
             operatorConfig("savepoint.format.type")
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 445fcb40..b20f0074 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
@@ -28,17 +30,22 @@ import 
org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.Value;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Instant;
 import java.util.Optional;
+import java.util.function.Predicate;
 
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE;
 
 /**
  * Reconciler responsible for handling the job lifecycle according to the 
desired and current
@@ -95,8 +102,8 @@ public abstract class AbstractJobReconciler<
             if (desiredJobState == JobState.RUNNING) {
                 LOG.info("Upgrading/Restarting running job, suspending 
first...");
             }
-            Optional<UpgradeMode> availableUpgradeMode = 
getAvailableUpgradeMode(ctx, deployConfig);
-            if (availableUpgradeMode.isEmpty()) {
+            AvailableUpgradeMode availableUpgradeMode = 
getAvailableUpgradeMode(ctx, deployConfig);
+            if (!availableUpgradeMode.isAvailable()) {
                 return false;
             }
 
@@ -107,8 +114,8 @@ public abstract class AbstractJobReconciler<
                     EventRecorder.Component.JobManagerDeployment,
                     MSG_SUSPENDED);
             // We must record the upgrade mode used to the status later
-            
currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
-            cancelJob(ctx, availableUpgradeMode.get());
+            
currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.getUpgradeMode().get());
+            cancelJob(ctx, availableUpgradeMode.getUpgradeMode().get());
             if (desiredJobState == JobState.RUNNING) {
                 ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
                         resource, deployConfig, clock);
@@ -140,15 +147,15 @@ public abstract class AbstractJobReconciler<
         return true;
     }
 
-    protected Optional<UpgradeMode> getAvailableUpgradeMode(
-            FlinkResourceContext<CR> ctx, Configuration deployConfig) {
+    protected AvailableUpgradeMode getAvailableUpgradeMode(
+            FlinkResourceContext<CR> ctx, Configuration deployConfig) throws 
Exception {
         var resource = ctx.getResource();
         var status = resource.getStatus();
         var upgradeMode = resource.getSpec().getJob().getUpgradeMode();
 
         if (upgradeMode == UpgradeMode.STATELESS) {
             LOG.info("Stateless job, ready for upgrade");
-            return Optional.of(UpgradeMode.STATELESS);
+            return AvailableUpgradeMode.of(UpgradeMode.STATELESS);
         }
 
         var flinkService = ctx.getFlinkService();
@@ -156,7 +163,7 @@ public abstract class AbstractJobReconciler<
                 && 
!flinkService.isHaMetadataAvailable(ctx.getObserveConfig())) {
             LOG.info(
                     "Job is in terminal state, ready for upgrade from observed 
latest checkpoint/savepoint");
-            return Optional.of(UpgradeMode.SAVEPOINT);
+            return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
         }
 
         if (ReconciliationUtils.isJobRunning(status)) {
@@ -167,19 +174,69 @@ public abstract class AbstractJobReconciler<
             if (changedToLastStateWithoutHa) {
                 LOG.info(
                         "Using savepoint upgrade mode when switching to 
last-state without HA previously enabled");
-                return Optional.of(UpgradeMode.SAVEPOINT);
+                return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
             }
 
             if (flinkVersionChanged(
                     ReconciliationUtils.getDeployedSpec(resource), 
resource.getSpec())) {
                 LOG.info("Using savepoint upgrade mode when upgrading Flink 
version");
-                return Optional.of(UpgradeMode.SAVEPOINT);
+                return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
             }
 
-            return Optional.of(upgradeMode);
+            if (upgradeMode == UpgradeMode.LAST_STATE) {
+                return changeLastStateIfCheckpointTooOld(ctx, deployConfig);
+            }
+
+            return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
+        }
+
+        return AvailableUpgradeMode.unavailable();
+    }
+
+    @VisibleForTesting
+    protected AvailableUpgradeMode changeLastStateIfCheckpointTooOld(
+            FlinkResourceContext<CR> ctx, Configuration deployConfig) throws 
Exception {
+
+        var maxAge = 
deployConfig.get(OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE);
+        if (maxAge == null) {
+            return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
+        }
+
+        var jobStatus = ctx.getResource().getStatus().getJobStatus();
+        var jobId = JobID.fromHexString(jobStatus.getJobId());
+        var startTime = 
Instant.ofEpochMilli(Long.parseLong(jobStatus.getStartTime()));
+        var now = clock.instant();
+
+        Predicate<Instant> withinMaxAge = ts -> now.minus(maxAge).isBefore(ts);
+
+        // If job started recently, no need to query checkpoint
+        if (withinMaxAge.test(startTime)) {
+            return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
         }
 
-        return Optional.empty();
+        var chkInfo = ctx.getFlinkService().getCheckpointInfo(jobId, 
ctx.getObserveConfig());
+        var completedTs =
+                chkInfo.f0
+                        
.map(CheckpointHistoryWrapper.CompletedCheckpointInfo::getTimestamp)
+                        .map(Instant::ofEpochMilli)
+                        .orElse(Instant.MIN);
+        var pendingTs =
+                chkInfo.f1
+                        
.map(CheckpointHistoryWrapper.PendingCheckpointInfo::getTimestamp)
+                        .map(Instant::ofEpochMilli)
+                        .orElse(Instant.MIN);
+
+        if (withinMaxAge.test(completedTs)) {
+            // We have a recent enough checkpoint
+            return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
+        } else if (withinMaxAge.test(pendingTs)) {
+            LOG.info("Waiting for pending checkpoint to complete before 
upgrading.");
+            return AvailableUpgradeMode.pendingUpgrade();
+        } else {
+            LOG.info(
+                    "Using savepoint upgrade mode because latest checkpoint is 
too old for last-state upgrade");
+            return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
+        }
     }
 
     protected void restoreJob(
@@ -273,4 +330,27 @@ public abstract class AbstractJobReconciler<
      * @throws Exception Error during cancellation.
      */
     protected abstract void cleanupAfterFailedJob(FlinkResourceContext<CR> 
ctx) throws Exception;
+
+    /** Object to capture available upgrade mode. */
+    @Value
+    public static class AvailableUpgradeMode {
+        Optional<UpgradeMode> upgradeMode;
+        boolean allowFallback;
+
+        public boolean isAvailable() {
+            return upgradeMode.isPresent();
+        }
+
+        static AvailableUpgradeMode of(UpgradeMode upgradeMode) {
+            return new AvailableUpgradeMode(Optional.of(upgradeMode), false);
+        }
+
+        static AvailableUpgradeMode unavailable() {
+            return new AvailableUpgradeMode(Optional.empty(), true);
+        }
+
+        static AvailableUpgradeMode pendingUpgrade() {
+            return new AvailableUpgradeMode(Optional.empty(), false);
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index c6111c8b..346accaa 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -72,14 +72,15 @@ public class ApplicationReconciler
     }
 
     @Override
-    protected Optional<UpgradeMode> getAvailableUpgradeMode(
-            FlinkResourceContext<FlinkDeployment> ctx, Configuration 
deployConfig) {
+    protected AvailableUpgradeMode getAvailableUpgradeMode(
+            FlinkResourceContext<FlinkDeployment> ctx, Configuration 
deployConfig)
+            throws Exception {
 
         var deployment = ctx.getResource();
         var status = deployment.getStatus();
         var availableUpgradeMode = super.getAvailableUpgradeMode(ctx, 
deployConfig);
 
-        if (availableUpgradeMode.isPresent()) {
+        if (availableUpgradeMode.isAvailable() || 
!availableUpgradeMode.isAllowFallback()) {
             return availableUpgradeMode;
         }
         var flinkService = ctx.getFlinkService();
@@ -95,7 +96,7 @@ public class ApplicationReconciler
             if (flinkService.isHaMetadataAvailable(deployConfig)) {
                 LOG.info(
                         "Job is not running but HA metadata is available for 
last state restore, ready for upgrade");
-                return Optional.of(UpgradeMode.LAST_STATE);
+                return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
             }
         }
 
@@ -123,7 +124,7 @@ public class ApplicationReconciler
 
         LOG.info(
                 "Job is not running and HA metadata is not available or usable 
for executing the upgrade, waiting for upgradeable state");
-        return Optional.empty();
+        return AvailableUpgradeMode.unavailable();
     }
 
     private void deleteJmThatNeverStarted(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index d71996af..d0f2b16d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -513,6 +513,30 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 
     @Override
     public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration 
conf) throws Exception {
+        var latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
+
+        if (latestCheckpointOpt.isPresent()
+                && latestCheckpointOpt
+                        .get()
+                        .getExternalPointer()
+                        
.equals(NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER)) {
+            throw new RecoveryFailureException(
+                    "Latest checkpoint not externally addressable, manual 
recovery required.",
+                    "CheckpointNotFound");
+        }
+        return latestCheckpointOpt.map(
+                pointer ->
+                        Savepoint.of(
+                                pointer.getExternalPointer(),
+                                pointer.getTimestamp(),
+                                SavepointTriggerType.UNKNOWN));
+    }
+
+    @Override
+    public Tuple2<
+                    Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,
+                    Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
+            getCheckpointInfo(JobID jobId, Configuration conf) throws 
Exception {
         try (RestClusterClient<String> clusterClient =
                 (RestClusterClient<String>) getClusterClient(conf)) {
 
@@ -531,20 +555,9 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                     .getSeconds(),
                             TimeUnit.SECONDS);
 
-            var latestCheckpointOpt = checkpoints.getLatestCheckpointPath();
-
-            if (latestCheckpointOpt.isPresent()
-                    && latestCheckpointOpt
-                            .get()
-                            .equals(
-                                    
NonPersistentMetadataCheckpointStorageLocation
-                                            .EXTERNAL_POINTER)) {
-                throw new RecoveryFailureException(
-                        "Latest checkpoint not externally addressable, manual 
recovery required.",
-                        "CheckpointNotFound");
-            }
-            return latestCheckpointOpt.map(
-                    pointer -> Savepoint.of(pointer, 
SavepointTriggerType.UNKNOWN));
+            return Tuple2.of(
+                    checkpoints.getLatestCompletedCheckpoint(),
+                    checkpoints.getInProgressCheckpoint());
         }
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
index fc9c7e5a..f53d0742 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
@@ -18,18 +18,25 @@
 
 package org.apache.flink.kubernetes.operator.service;
 
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import lombok.Value;
 
 import java.util.Optional;
 
+import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP;
+import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.FIELD_NAME_HISTORY;
 import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS;
 import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED;
 import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_RESTORED;
@@ -46,7 +53,28 @@ public class CheckpointHistoryWrapper implements 
ResponseBody {
     @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
     private ObjectNode latestCheckpoints;
 
-    public Optional<String> getLatestCheckpointPath() {
+    @JsonProperty(FIELD_NAME_HISTORY)
+    private ArrayNode history;
+
+    public Optional<PendingCheckpointInfo> getInProgressCheckpoint() {
+        if (history.isEmpty()) {
+            return Optional.empty();
+        }
+
+        var lastCp = history.get(0);
+        var status =
+                CheckpointStatsStatus.valueOf(
+                        
lastCp.get(CheckpointStatistics.FIELD_NAME_STATUS).asText());
+        if (status.isInProgress()) {
+            return Optional.of(
+                    new PendingCheckpointInfo(
+                            lastCp.get(FIELD_NAME_ID).asLong(),
+                            
lastCp.get(FIELD_NAME_TRIGGER_TIMESTAMP).asLong()));
+        }
+        return Optional.empty();
+    }
+
+    public Optional<CompletedCheckpointInfo> getLatestCompletedCheckpoint() {
         if (latestCheckpoints == null) {
             return Optional.empty();
         }
@@ -54,18 +82,18 @@ public class CheckpointHistoryWrapper implements 
ResponseBody {
         var latestCheckpoint = 
getCheckpointInfo(FIELD_NAME_RESTORED).orElse(null);
 
         var completed = getCheckpointInfo(FIELD_NAME_COMPLETED).orElse(null);
-        if (latestCheckpoint == null || (completed != null && completed.f0 > 
latestCheckpoint.f0)) {
+        if (latestCheckpoint == null || (completed != null && completed.id > 
latestCheckpoint.id)) {
             latestCheckpoint = completed;
         }
         var savepoint = getCheckpointInfo(FIELD_NAME_SAVEPOINT).orElse(null);
-        if (latestCheckpoint == null || (savepoint != null && savepoint.f0 > 
latestCheckpoint.f0)) {
+        if (latestCheckpoint == null || (savepoint != null && savepoint.id > 
latestCheckpoint.id)) {
             latestCheckpoint = savepoint;
         }
 
-        return Optional.ofNullable(latestCheckpoint).map(t -> t.f1);
+        return Optional.ofNullable(latestCheckpoint);
     }
 
-    private Optional<Tuple2<Long, String>> getCheckpointInfo(String field) {
+    private Optional<CompletedCheckpointInfo> getCheckpointInfo(String field) {
         return Optional.ofNullable(latestCheckpoints.get(field))
                 .filter(
                         checkpoint ->
@@ -73,8 +101,36 @@ public class CheckpointHistoryWrapper implements 
ResponseBody {
                                         && 
checkpoint.has(FIELD_NAME_EXTERNAL_PATH))
                 .map(
                         checkpoint ->
-                                Tuple2.of(
+                                new CompletedCheckpointInfo(
                                         checkpoint.get(FIELD_NAME_ID).asLong(),
-                                        
checkpoint.get(FIELD_NAME_EXTERNAL_PATH).asText()));
+                                        
checkpoint.get(FIELD_NAME_EXTERNAL_PATH).asText(),
+                                        getCheckpointTimestamp(checkpoint)));
+    }
+
+    private long getCheckpointTimestamp(JsonNode checkpoint) {
+        if (checkpoint.has(FIELD_NAME_TRIGGER_TIMESTAMP)) {
+            return checkpoint.get(FIELD_NAME_TRIGGER_TIMESTAMP).asLong();
+        } else {
+            return checkpoint
+                    .get(
+                            
CheckpointingStatistics.RestoredCheckpointStatistics
+                                    .FIELD_NAME_RESTORE_TIMESTAMP)
+                    .asLong();
+        }
+    }
+
+    /** Information about the latest completed checkpoint/savepoint. */
+    @Value
+    public static class CompletedCheckpointInfo {
+        long id;
+        String externalPointer;
+        long timestamp;
+    }
+
+    /** Information about the currently pending checkpoint/savepoint. */
+    @Value
+    public static class PendingCheckpointInfo {
+        long id;
+        long timestamp;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 7583ba1f..4914b761 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.service;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -92,6 +93,11 @@ public interface FlinkService {
 
     SavepointFetchResult fetchSavepointInfo(String triggerId, String jobId, 
Configuration conf);
 
+    Tuple2<
+                    Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,
+                    Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
+            getCheckpointInfo(JobID jobId, Configuration conf) throws 
Exception;
+
     void disposeSavepoint(String savepointPath, Configuration conf) throws 
Exception;
 
     Map<String, String> getClusterInfo(Configuration conf) throws Exception;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 4af79bc0..d41903fe 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -248,6 +248,14 @@ public class DefaultValidator implements 
FlinkResourceValidator {
                         String.format(
                                 "Periodic savepoints cannot be enabled when 
config key[%s] is not set",
                                 
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+            } else if (configuration.get(
+                            KubernetesOperatorConfigOptions
+                                    
.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE)
+                    != null) {
+                return Optional.of(
+                        String.format(
+                                "In order to use max-checkpoint age 
functionality config key[%s] must be set to allow triggering savepoint 
upgrades.",
+                                
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
             }
         }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 3aea5daf..d6965b04 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
@@ -42,6 +43,7 @@ import 
org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
 import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
+import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
 import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -66,6 +68,8 @@ import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.Setter;
 
 import javax.annotation.Nullable;
 
@@ -97,20 +101,27 @@ public class TestingFlinkService extends 
AbstractFlinkService {
 
     private final List<Tuple3<String, JobStatusMessage, Configuration>> jobs = 
new ArrayList<>();
     private final Map<JobID, String> jobErrors = new HashMap<>();
-    private final Set<String> sessions = new HashSet<>();
-    private boolean isPortReady = true;
-    private boolean isFlinkJobNotFound = false;
-    private boolean isFlinkJobTerminatedWithoutCancellation = false;
-    private boolean haDataAvailable = true;
-    private boolean jobManagerReady = true;
-    private boolean deployFailure = false;
-    private Runnable sessionJobSubmittedCallback;
-    private PodList podList = new PodList();
-    private Consumer<Configuration> listJobConsumer = conf -> {};
+    @Getter private final Set<String> sessions = new HashSet<>();
+    @Setter private boolean isFlinkJobNotFound = false;
+    @Setter private boolean isFlinkJobTerminatedWithoutCancellation = false;
+    @Setter private boolean isPortReady = true;
+    @Setter private boolean haDataAvailable = true;
+    @Setter private boolean jobManagerReady = true;
+    @Setter private boolean deployFailure = false;
+    @Setter private Runnable sessionJobSubmittedCallback;
+    @Setter private PodList podList = new PodList();
+    @Setter private Consumer<Configuration> listJobConsumer = conf -> {};
     private final List<String> disposedSavepoints = new ArrayList<>();
     private final Map<String, Boolean> savepointTriggers = new HashMap<>();
-    private int desiredReplicas = 0;
-    private int cancelJobCallCount = 0;
+
+    @Getter private int desiredReplicas = 0;
+    @Getter private int cancelJobCallCount = 0;
+
+    @Setter
+    private Tuple2<
+                    Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,
+                    Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
+            checkpointInfo;
 
     private Map<String, String> metricsValues = new HashMap<>();
 
@@ -146,10 +157,6 @@ public class TestingFlinkService extends 
AbstractFlinkService {
         jobs.removeIf(job -> job.f1.getJobState().isTerminalState());
     }
 
-    public Set<String> getSessions() {
-        return sessions;
-    }
-
     @Override
     public void submitApplicationCluster(
             JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) 
throws Exception {
@@ -197,22 +204,6 @@ public class TestingFlinkService extends 
AbstractFlinkService {
         return HighAvailabilityMode.isHighAvailabilityModeActivated(conf) && 
haDataAvailable;
     }
 
-    public void setHaDataAvailable(boolean haDataAvailable) {
-        this.haDataAvailable = haDataAvailable;
-    }
-
-    public void setJobManagerReady(boolean jmReady) {
-        this.jobManagerReady = jmReady;
-    }
-
-    public void setDeployFailure(boolean deployFailure) {
-        this.deployFailure = deployFailure;
-    }
-
-    public void setSessionJobSubmittedCallback(Runnable 
sessionJobSubmittedCallback) {
-        this.sessionJobSubmittedCallback = sessionJobSubmittedCallback;
-    }
-
     @Override
     public void submitSessionCluster(Configuration conf) throws Exception {
         if (deployFailure) {
@@ -255,10 +246,6 @@ public class TestingFlinkService extends 
AbstractFlinkService {
         return super.listJobs(conf);
     }
 
-    public void setListJobConsumer(Consumer<Configuration> listJobConsumer) {
-        this.listJobConsumer = listJobConsumer;
-    }
-
     public List<Tuple3<String, JobStatusMessage, Configuration>> listJobs() {
         return jobs;
     }
@@ -452,6 +439,30 @@ public class TestingFlinkService extends 
AbstractFlinkService {
 
     @Override
     public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration 
conf) throws Exception {
+        jobs.stream()
+                .filter(js -> js.f1.getJobId().equals(jobId))
+                .findAny()
+                .ifPresent(
+                        t -> {
+                            if (!t.f1.getJobState().isGloballyTerminalState()) 
{
+                                throw new RuntimeException(
+                                        "Checkpoint should not be queried if 
job is not in terminal state");
+                            }
+                        });
+
+        return super.getLastCheckpoint(jobId, conf);
+    }
+
+    @Override
+    public Tuple2<
+                    Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,
+                    Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
+            getCheckpointInfo(JobID jobId, Configuration conf) throws 
Exception {
+
+        if (checkpointInfo != null) {
+            return checkpointInfo;
+        }
+
         var jobOpt = jobs.stream().filter(js -> 
js.f1.getJobId().equals(jobId)).findAny();
 
         if (jobOpt.isEmpty()) {
@@ -459,14 +470,15 @@ public class TestingFlinkService extends 
AbstractFlinkService {
         }
 
         var t = jobOpt.get();
-        if (!t.f1.getJobState().isGloballyTerminalState()) {
-            throw new Exception("Checkpoint should not be queried if job is 
not in terminal state");
-        }
 
         if (t.f0 != null) {
-            return Optional.of(Savepoint.of(t.f0, 
SavepointTriggerType.UNKNOWN));
+            return Tuple2.of(
+                    Optional.of(
+                            new 
CheckpointHistoryWrapper.CompletedCheckpointInfo(
+                                    0L, t.f0, System.currentTimeMillis())),
+                    Optional.empty());
         } else {
-            return Optional.empty();
+            return Tuple2.of(Optional.empty(), Optional.empty());
         }
     }
 
@@ -475,19 +487,6 @@ public class TestingFlinkService extends 
AbstractFlinkService {
         return isPortReady;
     }
 
-    public void setPortReady(boolean isPortReady) {
-        this.isPortReady = isPortReady;
-    }
-
-    public void setFlinkJobTerminatedWithoutCancellation(
-            boolean isFlinkJobTerminatedWithoutCancellation) {
-        this.isFlinkJobTerminatedWithoutCancellation = 
isFlinkJobTerminatedWithoutCancellation;
-    }
-
-    public void setFlinkJobNotFound(boolean isFlinkJobNotFound) {
-        this.isFlinkJobNotFound = isFlinkJobNotFound;
-    }
-
     @Override
     public PodList getJmPodList(FlinkDeployment deployment, Configuration 
conf) {
         return podList;
@@ -498,10 +497,6 @@ public class TestingFlinkService extends 
AbstractFlinkService {
         return podList;
     }
 
-    public void setJmPodList(PodList podList) {
-        this.podList = podList;
-    }
-
     public void markApplicationJobFailedWithError(JobID jobID, String error) 
throws Exception {
         var job = jobs.stream().filter(tuple -> 
tuple.f1.getJobId().equals(jobID)).findFirst();
         if (job.isEmpty()) {
@@ -549,10 +544,6 @@ public class TestingFlinkService extends 
AbstractFlinkService {
         return true;
     }
 
-    public int getDesiredReplicas() {
-        return desiredReplicas;
-    }
-
     public void setMetricValue(String name, String value) {
         metricsValues.put(name, value);
     }
@@ -562,8 +553,4 @@ public class TestingFlinkService extends 
AbstractFlinkService {
             Configuration conf, String jobId, List<String> metricNames) {
         return metricsValues;
     }
-
-    public int getCancelJobCallCount() {
-        return cancelJobCallCount;
-    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 8a4cb50c..2103b182 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -331,7 +331,7 @@ public class FlinkDeploymentControllerTest {
                 .andReply(validatingResponseProvider)
                 .once();
 
-        
flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage, 
reason));
+        
flinkService.setPodList(TestUtils.createFailedPodList(crashLoopMessage, 
reason));
 
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         UpdateControl<FlinkDeployment> updateControl;
@@ -908,7 +908,7 @@ public class FlinkDeploymentControllerTest {
     @Test
     public void testSuccessfulObservationShouldClearErrors() throws Exception {
         final String crashLoopMessage = "deploy errors";
-        flinkService.setJmPodList(
+        flinkService.setPodList(
                 TestUtils.createFailedPodList(
                         crashLoopMessage, 
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 032bac2d..3399e1f6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -621,7 +621,7 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
                 deployment.getStatus().getJobManagerDeploymentStatus());
         // simulate deployment failure
         String podFailedMessage = "list jobs error";
-        flinkService.setJmPodList(
+        flinkService.setPodList(
                 TestUtils.createFailedPodList(
                         podFailedMessage, 
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
         flinkService.setPortReady(false);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 6d16a5ed..20440fbf 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -34,9 +36,11 @@ import 
org.apache.flink.kubernetes.operator.api.status.JobStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.api.status.Savepoint;
 import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
+import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -48,9 +52,11 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -417,6 +423,104 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
                 flinkService.listJobs().get(0).f0);
     }
 
+    @Test
+    public void testLastStateMaxCheckpointAge() throws Exception {
+        var deployment = TestUtils.buildApplicationCluster();
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
+
+        // Set job status to running
+        var jobStatus = deployment.getStatus().getJobStatus();
+        long now = System.currentTimeMillis();
+
+        jobStatus.setState("RUNNING");
+        jobStatus.setStartTime(Long.toString(now));
+        jobStatus.setJobId(new JobID().toString());
+
+        var jobReconciler = (ApplicationReconciler) 
this.reconciler.getReconciler();
+        var ctx = getResourceContext(deployment);
+        var deployConf = ctx.getDeployConfig(deployment.getSpec());
+
+        assertEquals(
+                
AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.LAST_STATE),
+                jobReconciler.getAvailableUpgradeMode(ctx, deployConf));
+
+        deployConf.set(
+                
KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE,
+                Duration.ofMinutes(1));
+
+        // Test without available checkpoints
+        flinkService.setCheckpointInfo(Tuple2.of(Optional.empty(), 
Optional.empty()));
+
+        // Job started just now
+        jobStatus.setStartTime(Long.toString(now));
+        assertEquals(
+                
AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.LAST_STATE),
+                jobReconciler.getAvailableUpgradeMode(ctx, deployConf));
+
+        // Job started more than a minute ago
+        jobStatus.setStartTime(Long.toString(now - 61000));
+        assertEquals(
+                
AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT),
+                jobReconciler.getAvailableUpgradeMode(ctx, deployConf));
+
+        // If we have a pending savepoint within the max age, wait
+        flinkService.setCheckpointInfo(
+                Tuple2.of(
+                        Optional.empty(),
+                        Optional.of(
+                                new 
CheckpointHistoryWrapper.PendingCheckpointInfo(
+                                        0, now - 30000))));
+        assertEquals(
+                AbstractJobReconciler.AvailableUpgradeMode.pendingUpgrade(),
+                jobReconciler.getAvailableUpgradeMode(ctx, deployConf));
+
+        // If pending savepoint triggered before max age, use savepoint
+        flinkService.setCheckpointInfo(
+                Tuple2.of(
+                        Optional.empty(),
+                        Optional.of(
+                                new 
CheckpointHistoryWrapper.PendingCheckpointInfo(
+                                        0, now - 61000))));
+        assertEquals(
+                
AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT),
+                jobReconciler.getAvailableUpgradeMode(ctx, deployConf));
+
+        // Allow fallback to job start even with pending savepoint
+        jobStatus.setStartTime(Long.toString(now - 30000));
+        assertEquals(
+                
AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.LAST_STATE),
+                jobReconciler.getAvailableUpgradeMode(ctx, deployConf));
+
+        // Recent completed checkpoint
+        jobStatus.setStartTime(Long.toString(now - 61000));
+        flinkService.setCheckpointInfo(
+                Tuple2.of(
+                        Optional.of(
+                                new 
CheckpointHistoryWrapper.CompletedCheckpointInfo(
+                                        0, "s", now - 30000)),
+                        Optional.of(
+                                new 
CheckpointHistoryWrapper.PendingCheckpointInfo(
+                                        0, now - 61000))));
+        assertEquals(
+                
AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.LAST_STATE),
+                jobReconciler.getAvailableUpgradeMode(ctx, deployConf));
+
+        // Job start and checkpoint too old, trigger savepoint
+        jobStatus.setStartTime(Long.toString(now - 61000));
+        flinkService.setCheckpointInfo(
+                Tuple2.of(
+                        Optional.of(
+                                new 
CheckpointHistoryWrapper.CompletedCheckpointInfo(
+                                        0, "s", now - 61000)),
+                        Optional.of(
+                                new 
CheckpointHistoryWrapper.PendingCheckpointInfo(
+                                        0, now - 61000))));
+        assertEquals(
+                
AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT),
+                jobReconciler.getAvailableUpgradeMode(ctx, deployConf));
+    }
+
     private static Stream<Arguments> testInitialJmDeployCannotStartParams() {
         return Stream.of(
                 Arguments.of(UpgradeMode.LAST_STATE, true),
@@ -623,6 +727,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
                                 .toBuilder()
                                 
.jobId(runningJobs.get(0).f1.getJobId().toHexString())
                                 .jobName(runningJobs.get(0).f1.getJobName())
+                                
.startTime(Long.toString(System.currentTimeMillis()))
                                 .state("RUNNING")
                                 .build());
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index a5198637..0687fe77 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -172,6 +172,19 @@ public class DefaultValidatorTest {
                         "Periodic savepoints cannot be enabled when config 
key[%s] is not set",
                         CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
 
+        testError(
+                dep ->
+                        dep.getSpec()
+                                .setFlinkConfiguration(
+                                        Map.of(
+                                                KubernetesOperatorConfigOptions
+                                                        
.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE
+                                                        .key(),
+                                                "1m")),
+                String.format(
+                        "In order to use max-checkpoint age functionality 
config key[%s] must be set to allow triggering savepoint upgrades.",
+                        CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+
         // Test conf validation
         testSuccess(
                 dep ->

Reply via email to