[FLINK-4754] [checkpoints] Make number of retained checkpoints user configurable

This closes #3374


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b46f5e05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b46f5e05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b46f5e05

Branch: refs/heads/master
Commit: b46f5e050bdd77fe6e501bad20466d8777218131
Parents: 6b5e1f6
Author: Tony Wei <tony19920...@gmail.com>
Authored: Mon Feb 20 18:30:24 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +
 .../apache/flink/configuration/CoreOptions.java |   5 +
 .../checkpoint/CheckpointRecoveryFactory.java   |   3 +-
 .../checkpoint/CompletedCheckpointStore.java    |   5 +
 .../StandaloneCheckpointRecoveryFactory.java    |   5 +-
 .../StandaloneCompletedCheckpointStore.java     |   5 +
 .../ZooKeeperCheckpointRecoveryFactory.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |   5 +
 .../executiongraph/ExecutionGraphBuilder.java   |  12 ++-
 .../CheckpointCoordinatorFailureTest.java       |   5 +
 .../ExecutionGraphDeploymentTest.java           | 101 +++++++++++++++++++
 .../jobmanager/JobManagerHARecoveryTest.java    |   7 +-
 12 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 013e56a..048e012 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -182,6 +182,8 @@ will be used under the directory specified by 
jobmanager.web.tmpdir.
 
 - `state.checkpoints.dir`: The target directory for meta data of [externalized 
checkpoints]({{ site.baseurl 
}}/setup/checkpoints.html#externalized-checkpoints).
 
+- `state.checkpoints.max-retained-checkpoints`: The maximum number of 
completed checkpoint instances to retain. This setting defines how many 
completed checkpoint instances can be stored in `CompletedCheckpointStore`. 
(Default: 1)
+
 - `high-availability.zookeeper.storageDir`: Required for HA. Directory for 
storing JobManager metadata; this is persisted in the state backend and only a 
pointer to this state is stored in ZooKeeper. Exactly like the checkpoint 
directory it must be accessible from the JobManager and a local filesystem 
should only be used for local deployments. Previously this key was named 
`recovery.zookeeper.storageDir`.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user JARs) on 
the TaskManagers.

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 4e30ceb..1e40569 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -45,4 +45,9 @@ public class CoreOptions {
        public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
                .key("state.backend")
                .noDefaultValue();
+
+       /** The maximum number of completed checkpoint instances to retain.*/
+       public static final ConfigOption<Integer> 
STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions
+               .key("state.checkpoints.max-retained-checkpoints")
+               .defaultValue(1);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index 0c7dfa7..3fb1385 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -44,10 +44,11 @@ public interface CheckpointRecoveryFactory {
         * Creates a {@link CompletedCheckpointStore} instance for a job.
         *
         * @param jobId           Job ID to recover checkpoints for
+        * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints 
to retain
         * @param userClassLoader User code class loader of the job
         * @return {@link CompletedCheckpointStore} instance for the job
         */
-       CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader 
userClassLoader)
+       CompletedCheckpointStore createCheckpointStore(JobID jobId, int 
maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
                        throws Exception;
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index e91e038..9c2b199 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -73,6 +73,11 @@ public interface CompletedCheckpointStore {
        int getNumberOfRetainedCheckpoints();
 
        /**
+        * Returns the max number of retained checkpoints.
+        */
+       int getMaxNumberOfRetainedCheckpoints();
+
+       /**
         * This method returns whether the completed checkpoint store requires 
checkpoints to be
         * externalized. Externalized checkpoints have their meta data 
persisted, which the checkpoint
         * store can exploit (for example by simply pointing the persisted 
metadata).

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index 57785ce..2d2cc2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -37,11 +37,10 @@ public class StandaloneCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
        }
 
        @Override
-       public CompletedCheckpointStore createCheckpointStore(JobID jobId, 
ClassLoader userClassLoader)
+       public CompletedCheckpointStore createCheckpointStore(JobID jobId, int 
maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
                        throws Exception {
 
-               return new StandaloneCompletedCheckpointStore(
-                               
CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+               return new 
StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index a0248b2..6c752f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -84,6 +84,11 @@ public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointSt
        }
 
        @Override
+       public int getMaxNumberOfRetainedCheckpoints() {
+               return maxNumberOfCheckpointsToRetain;
+       }
+
+       @Override
        public void shutdown(JobStatus jobStatus) throws Exception {
                try {
                        LOG.info("Shutting down");

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 09bfa8c..481559b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -59,11 +59,11 @@ public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFac
        }
 
        @Override
-       public CompletedCheckpointStore createCheckpointStore(JobID jobId, 
ClassLoader userClassLoader)
+       public CompletedCheckpointStore createCheckpointStore(JobID jobId, int 
maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
                        throws Exception {
 
                return ZooKeeperUtils.createCompletedCheckpoints(client, 
config, jobId,
-                               NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, 
executor);
+                               maxNumberOfCheckpointsToRetain, executor);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 7a167cb..1319c27 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -252,6 +252,11 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
        }
 
        @Override
+       public int getMaxNumberOfRetainedCheckpoints() {
+               return maxNumberOfCheckpointsToRetain;
+       }
+
+       @Override
        public void shutdown(JobStatus jobStatus) throws Exception {
                if (jobStatus.isGloballyTerminalState()) {
                        LOG.info("Shutting down");

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ec7103c..8a35773 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -176,7 +177,16 @@ public class ExecutionGraphBuilder {
                        CompletedCheckpointStore completedCheckpoints;
                        CheckpointIDCounter checkpointIdCounter;
                        try {
-                               completedCheckpoints = 
recoveryFactory.createCheckpointStore(jobId, classLoader);
+                               int maxNumberOfCheckpointsToRetain = 
jobManagerConfig.getInteger(
+                                       
CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS);
+                               if (maxNumberOfCheckpointsToRetain <= 0) {
+                                       // warning and use 1 as the default 
value if the setting in
+                                       // 
state.checkpoints.max-retained-checkpoints is not greater than 0.
+                                       log.warn("The setting for 
max-retained-checkpoints is not a positive number.");
+                                       maxNumberOfCheckpointsToRetain = 
CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue();
+                               }
+
+                               completedCheckpoints = 
recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, 
classLoader);
                                checkpointIdCounter = 
recoveryFactory.createCheckpointIDCounter(jobId);
                        }
                        catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 9517257..340e2a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -136,6 +136,11 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                }
 
                @Override
+               public int getMaxNumberOfRetainedCheckpoints() {
+                       return 1;
+               }
+
+               @Override
                public boolean requiresExternalizedCheckpoints() {
                        return false;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 30824e0..57b549b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotEquals;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,14 +33,20 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -51,8 +58,11 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.operators.BatchTask;
@@ -62,6 +72,7 @@ import 
org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
 public class ExecutionGraphDeploymentTest {
 
@@ -435,6 +446,63 @@ public class ExecutionGraphDeploymentTest {
                assertEquals(JobStatus.FAILED, eg.getState());
        }
 
+       @Test
+       public void testSettingDefaultMaxNumberOfCheckpointsToRetain() {
+               try {
+                       final Configuration jobManagerConfig = new 
Configuration();
+
+                       final ExecutionGraph eg = 
createExecutionGraph(jobManagerConfig);
+
+                       assertEquals((int) 
CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+                               
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testSettingMaxNumberOfCheckpointsToRetain() {
+               try {
+                       final int maxNumberOfCheckpointsToRetain = 10;
+                       final Configuration jobManagerConfig = new 
Configuration();
+                       
jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
+                               maxNumberOfCheckpointsToRetain);
+
+                       final ExecutionGraph eg = 
createExecutionGraph(jobManagerConfig);
+
+                       assertEquals(maxNumberOfCheckpointsToRetain,
+                               
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testSettingIllegalMaxNumberOfCheckpointsToRetain() {
+               try {
+                       final int negativeMaxNumberOfCheckpointsToRetain = -10;
+
+                       final Configuration jobManagerConfig = new 
Configuration();
+                       
jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
+                               negativeMaxNumberOfCheckpointsToRetain);
+
+                       final ExecutionGraph eg = 
createExecutionGraph(jobManagerConfig);
+
+                       assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+                               
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+                       assertEquals((int) 
CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+                               
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
        private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> 
setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception 
{
                final JobID jobId = new JobID();
 
@@ -497,4 +565,37 @@ public class ExecutionGraphDeploymentTest {
                        throw new Exception();
                }
        }
+
+       private ExecutionGraph createExecutionGraph(Configuration 
configuration) throws Exception {
+               final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+
+               final JobID jobId = new JobID();
+               final JobGraph jobGraph = new JobGraph(jobId, "test");
+               jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+                       new ArrayList<JobVertexID>(1),
+                       new ArrayList<JobVertexID>(1),
+                       new ArrayList<JobVertexID>(1),
+                       100,
+                       10 * 60 * 1000,
+                       0,
+                       1,
+                       ExternalizedCheckpointSettings.none(),
+                       null,
+                       false));
+
+               return ExecutionGraphBuilder.buildGraph(
+                       null,
+                       jobGraph,
+                       configuration,
+                       executor,
+                       executor,
+                       new ProgrammedSlotProvider(1),
+                       getClass().getClassLoader(),
+                       new StandaloneCheckpointRecoveryFactory(),
+                       Time.minutes(10),
+                       new NoRestartStrategy(),
+                       new UnregisteredMetricsGroup(),
+                       1,
+                       LoggerFactory.getLogger(getClass()));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 115b06c..32358c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -485,6 +485,11 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
+               public int getMaxNumberOfRetainedCheckpoints() {
+                       return 1;
+               }
+
+               @Override
                public boolean requiresExternalizedCheckpoints() {
                        return false;
                }
@@ -509,7 +514,7 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
-               public CompletedCheckpointStore createCheckpointStore(JobID 
jobId, ClassLoader userClassLoader) throws Exception {
+               public CompletedCheckpointStore createCheckpointStore(JobID 
jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws 
Exception {
                        return store;
                }
 

Reply via email to