[FLINK-4754] [checkpoints] Make number of retained checkpoints user configurable
This closes #3374 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b46f5e05 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b46f5e05 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b46f5e05 Branch: refs/heads/master Commit: b46f5e050bdd77fe6e501bad20466d8777218131 Parents: 6b5e1f6 Author: Tony Wei <tony19920...@gmail.com> Authored: Mon Feb 20 18:30:24 2017 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Mar 16 14:43:27 2017 +0100 ---------------------------------------------------------------------- docs/setup/config.md | 2 + .../apache/flink/configuration/CoreOptions.java | 5 + .../checkpoint/CheckpointRecoveryFactory.java | 3 +- .../checkpoint/CompletedCheckpointStore.java | 5 + .../StandaloneCheckpointRecoveryFactory.java | 5 +- .../StandaloneCompletedCheckpointStore.java | 5 + .../ZooKeeperCheckpointRecoveryFactory.java | 4 +- .../ZooKeeperCompletedCheckpointStore.java | 5 + .../executiongraph/ExecutionGraphBuilder.java | 12 ++- .../CheckpointCoordinatorFailureTest.java | 5 + .../ExecutionGraphDeploymentTest.java | 101 +++++++++++++++++++ .../jobmanager/JobManagerHARecoveryTest.java | 7 +- 12 files changed, 151 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index 013e56a..048e012 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -182,6 +182,8 @@ will be used under the directory specified by jobmanager.web.tmpdir. - `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/checkpoints.html#externalized-checkpoints). +- `state.checkpoints.max-retained-checkpoints`: The maximum number of completed checkpoint instances to retain. This setting defines how many completed checkpoint instances can be stored in `CompletedCheckpointStore`. (Default: 1) + - `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`. - `blob.storage.directory`: Directory for storing blobs (such as user JARs) on the TaskManagers. http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index 4e30ceb..1e40569 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -45,4 +45,9 @@ public class CoreOptions { public static final ConfigOption<String> STATE_BACKEND = ConfigOptions .key("state.backend") .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption<Integer> STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions + .key("state.checkpoints.max-retained-checkpoints") + .defaultValue(1); } http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java index 0c7dfa7..3fb1385 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java @@ -44,10 +44,11 @@ public interface CheckpointRecoveryFactory { * Creates a {@link CompletedCheckpointStore} instance for a job. * * @param jobId Job ID to recover checkpoints for + * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain * @param userClassLoader User code class loader of the job * @return {@link CompletedCheckpointStore} instance for the job */ - CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) + CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception; /** http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index e91e038..9c2b199 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -73,6 +73,11 @@ public interface CompletedCheckpointStore { int getNumberOfRetainedCheckpoints(); /** + * Returns the max number of retained checkpoints. + */ + int getMaxNumberOfRetainedCheckpoints(); + + /** * This method returns whether the completed checkpoint store requires checkpoints to be * externalized. Externalized checkpoints have their meta data persisted, which the checkpoint * store can exploit (for example by simply pointing the persisted metadata). http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java index 57785ce..2d2cc2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java @@ -37,11 +37,10 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa } @Override - public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) + public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception { - return new StandaloneCompletedCheckpointStore( - CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN); + return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index a0248b2..6c752f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -84,6 +84,11 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt } @Override + public int getMaxNumberOfRetainedCheckpoints() { + return maxNumberOfCheckpointsToRetain; + } + + @Override public void shutdown(JobStatus jobStatus) throws Exception { try { LOG.info("Shutting down"); http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java index 09bfa8c..481559b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -59,11 +59,11 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac } @Override - public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) + public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception { return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId, - NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, executor); + maxNumberOfCheckpointsToRetain, executor); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 7a167cb..1319c27 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -252,6 +252,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto } @Override + public int getMaxNumberOfRetainedCheckpoints() { + return maxNumberOfCheckpointsToRetain; + } + + @Override public void shutdown(JobStatus jobStatus) throws Exception { if (jobStatus.isGloballyTerminalState()) { LOG.info("Shutting down"); http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index ec7103c..8a35773 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; @@ -176,7 +177,16 @@ public class ExecutionGraphBuilder { CompletedCheckpointStore completedCheckpoints; CheckpointIDCounter checkpointIdCounter; try { - completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, classLoader); + int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger( + CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS); + if (maxNumberOfCheckpointsToRetain <= 0) { + // warning and use 1 as the default value if the setting in + // state.checkpoints.max-retained-checkpoints is not greater than 0. + log.warn("The setting for max-retained-checkpoints is not a positive number."); + maxNumberOfCheckpointsToRetain = CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(); + } + + completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader); checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 9517257..340e2a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -136,6 +136,11 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { } @Override + public int getMaxNumberOfRetainedCheckpoints() { + return 1; + } + + @Override public boolean requiresExternalizedCheckpoints() { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 30824e0..57b549b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -22,6 +22,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.junit.Assert.assertNotEquals; import java.util.ArrayList; import java.util.Arrays; @@ -32,14 +33,20 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -51,8 +58,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.operators.BatchTask; @@ -62,6 +72,7 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.SerializedValue; import org.junit.Test; +import org.slf4j.LoggerFactory; public class ExecutionGraphDeploymentTest { @@ -435,6 +446,63 @@ public class ExecutionGraphDeploymentTest { assertEquals(JobStatus.FAILED, eg.getState()); } + @Test + public void testSettingDefaultMaxNumberOfCheckpointsToRetain() { + try { + final Configuration jobManagerConfig = new Configuration(); + + final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); + + assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(), + eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSettingMaxNumberOfCheckpointsToRetain() { + try { + final int maxNumberOfCheckpointsToRetain = 10; + final Configuration jobManagerConfig = new Configuration(); + jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS, + maxNumberOfCheckpointsToRetain); + + final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); + + assertEquals(maxNumberOfCheckpointsToRetain, + eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSettingIllegalMaxNumberOfCheckpointsToRetain() { + try { + final int negativeMaxNumberOfCheckpointsToRetain = -10; + + final Configuration jobManagerConfig = new Configuration(); + jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS, + negativeMaxNumberOfCheckpointsToRetain); + + final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); + + assertNotEquals(negativeMaxNumberOfCheckpointsToRetain, + eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); + assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(), + eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception { final JobID jobId = new JobID(); @@ -497,4 +565,37 @@ public class ExecutionGraphDeploymentTest { throw new Exception(); } } + + private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception { + final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + final JobID jobId = new JobID(); + final JobGraph jobGraph = new JobGraph(jobId, "test"); + jobGraph.setSnapshotSettings(new JobSnapshottingSettings( + new ArrayList<JobVertexID>(1), + new ArrayList<JobVertexID>(1), + new ArrayList<JobVertexID>(1), + 100, + 10 * 60 * 1000, + 0, + 1, + ExternalizedCheckpointSettings.none(), + null, + false)); + + return ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + configuration, + executor, + executor, + new ProgrammedSlotProvider(1), + getClass().getClassLoader(), + new StandaloneCheckpointRecoveryFactory(), + Time.minutes(10), + new NoRestartStrategy(), + new UnregisteredMetricsGroup(), + 1, + LoggerFactory.getLogger(getClass())); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 115b06c..32358c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -485,6 +485,11 @@ public class JobManagerHARecoveryTest { } @Override + public int getMaxNumberOfRetainedCheckpoints() { + return 1; + } + + @Override public boolean requiresExternalizedCheckpoints() { return false; } @@ -509,7 +514,7 @@ public class JobManagerHARecoveryTest { } @Override - public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) throws Exception { + public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception { return store; }