[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2608 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82978898 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java --- @@ -256,29 +306,51 @@ public boolean acknowledgeTask( * Aborts a checkpoint because it expired (took too long). */ public void abortExpired() throws Exception { --- End diff -- I would like to do this as a follow up --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82977266 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java --- @@ -18,23 +18,105 @@ package org.apache.flink.runtime.checkpoint.savepoint; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * Savepoint store used to persist {@link Savepoint} instances. + * A file system based savepoint store. * - * The main implementation is the {@link FsSavepointStore}. We also have the - * {@link HeapSavepointStore} for historical reasons (introduced in Flink 1.0). + * Stored savepoints have the following format: + * + * MagicNumber SavepointVersion Savepoint + * - MagicNumber => int + * - SavepointVersion => int (returned by Savepoint#getVersion()) + * - Savepoint => bytes (serialized via version-specific SavepointSerializer) + * */ -public interface SavepointStore { +public class SavepointStore { + + private static final Logger LOG = LoggerFactory.getLogger(SavepointStore.class); + + /** Magic number for sanity checks against stored savepoints. */ + private static final int MAGIC_NUMBER = 0x4960672d; + + /** Prefix for savepoint files. */ + private static final String prefix = "savepoint-"; /** * Stores the savepoint. * +* @param targetDirectory Target directory to store savepoint in * @param savepoint Savepoint to be stored * @paramSavepoint type * @return Path of stored savepoint * @throws Exception Failures during store are forwarded */ -String storeSavepoint(T savepoint) throws Exception; + public static String storeSavepoint( + String targetDirectory, + T savepoint) throws IOException { + + checkNotNull(targetDirectory, "Target directory"); + checkNotNull(savepoint, "Savepoint"); + + Exception latestException = null; + Path path = null; + FSDataOutputStream fdos = null; + + FileSystem fs = null; + + // Try to create a FS output stream + for (int attempt = 0; attempt < 10; attempt++) { + path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix)); + + if (fs == null) { + fs = FileSystem.get(path.toUri()); + } + + try { + fdos = fs.create(path, false); + break; + } catch (Exception e) { + latestException = e; + } + } + + if (fdos == null) { + throw new IOException("Failed to create file output stream at " + path, latestException); + } + + boolean success = false; + try (DataOutputStream dos = new DataOutputStream(fdos)) { + // Write header + dos.writeInt(MAGIC_NUMBER); + dos.writeInt(savepoint.getVersion()); + + // Write savepoint + SavepointSerializer serializer = SavepointSerializers.getSerializer(savepoint); + serializer.serialize(savepoint, dos); + success = true; + } finally { + if (!success && fs.exists(path)) { + if (!fs.delete(path, true)) { + LOG.warn("Failed to delete file " + path + " after failed write."); --- End diff -- Replaced here and below --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82977296 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java --- @@ -43,24 +125,62 @@ * @return The loaded savepoint * @throws Exception Failures during load are forwared */ - Savepoint loadSavepoint(String path) throws Exception; + public static Savepoint loadSavepoint(String path) throws IOException { + Preconditions.checkNotNull(path, "Path"); - /** -* Disposes the savepoint at the specified path. -* -* @param pathPath of savepoint to dispose -* @throws Exception Failures during diposal are forwarded -*/ - void disposeSavepoint(String path) throws Exception; + try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path { + int magicNumber = dis.readInt(); + + if (magicNumber == MAGIC_NUMBER) { + int version = dis.readInt(); + + SavepointSerializer serializer = SavepointSerializers.getSerializer(version); + return serializer.deserialize(dis); + } else { + throw new RuntimeException("Unexpected magic number. This is most likely " + + "caused by trying to load a Flink 1.0 savepoint. You cannot load a " + + "savepoint triggered by Flink 1.0 with this version of Flink. If it is " + + "_not_ a Flink 1.0 savepoint, this error indicates that the specified " + + "file is not a proper savepoint or the file has been corrupted."); + } + } + } /** -* Shut downs the savepoint store. +* Removes the savepoint meta data w/o loading and disposing it. * -* Only necessary for implementations where the savepoint life-cycle is -* bound to the cluster life-cycle. -* -* @throws Exception Failures during shut down are forwarded +* @param path Path of savepoint to remove +* @throws Exception Failures during disposal are forwarded */ - void shutdown() throws Exception; + public static void removeSavepoint(String path) throws Exception { --- End diff -- Changed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82976869 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java --- @@ -48,13 +48,12 @@ public static CompletedCheckpoint loadAndValidateSavepoint( JobID jobId, Maptasks, - SavepointStore savepointStore, String savepointPath) throws Exception { // (1) load the savepoint - Savepoint savepoint = savepointStore.loadSavepoint(savepointPath); + Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath); final Map taskStates = new HashMap<>(savepoint.getTaskStates().size()); - + // (2) validate it (parallelism, etc) for (TaskState taskState : savepoint.getTaskStates()) { ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID()); --- End diff -- Yes, updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82976634 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -233,68 +229,90 @@ public int getNumberOfRetainedCheckpoints() { } @Override - public void shutdown() throws Exception { - LOG.info("Shutting down"); + public void shutdown(JobStatus jobStatus) throws Exception { --- End diff -- Our only option would be to wrap in our own Exception, because Curator is throwing the general `Exception`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82976383 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -172,7 +168,7 @@ public void recover() throws Exception { for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) { try { - removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i)); + removeSubsumed(initialCheckpoints.get(i)); --- End diff -- Yes. Even more, I think this is generally dangerous. What if a checkpoint is recovered, but the checkpoint cannot be restored, than we will have lost all others. Since we currently only keep a single one anyways, it is not a problem yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82972407 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java --- @@ -64,9 +62,9 @@ public void recover() throws Exception { @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - checkpoints.addLast(checkpoint); + checkpoints.add(checkpoint); if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { - checkpoints.removeFirst().discardState(); + checkpoints.remove().subsume(); --- End diff -- Manually triggered savepoints for example are not discarded when they are subsumed. The CheckpointProperties constructor is package private (for testing) and only the static creator methods (for persistent checkpoints, regular checkpoints, and manually triggered savepoints) are publicly accessible. Let me add a check to the properties that only allow manual discard if the checkpoint is persisted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82971700 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -18,44 +18,243 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.jobgraph.JobStatus; + +import java.io.Serializable; + /** * The configuration of a checkpoint, such as whether * - * The checkpoint is a savepoint + * The checkpoint should be persisted * The checkpoint must be full, or may be incremental * The checkpoint format must be the common (cross backend) format, or may be state-backend specific --- End diff -- Added a note `(not yet implemented` after those. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82971361 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -282,29 +279,71 @@ public boolean isShutdown() { // Handling checkpoints and messages // - public Future triggerSavepoint(long timestamp) throws Exception { - CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint()); + /** +* Triggers a savepoint with the default savepoint directory as a target. +* +* @param timestamp The timestamp for the savepoint. +* @return A future to the completed checkpoint +* @throws IllegalStateException If no default savepoint directory has been configured +* @throws Exception Failures during triggering are forwarded +*/ + public Future triggerSavepoint(long timestamp) throws Exception { + return triggerSavepoint(timestamp, null); + } + + /** +* Triggers a savepoint with the given savepoint directory as a target. +* +* @param timestamp The timestamp for the savepoint. +* @param savepointDirectory Target directory for the savepoint. +* @return A future to the completed checkpoint +* @throws IllegalStateException If no savepoint directory has been +* specified and no default savepoint directory has been +* configured +* @throws Exception Failures during triggering are forwarded +*/ + public Future triggerSavepoint(long timestamp, String savepointDirectory) throws Exception { + String targetDirectory; + if (savepointDirectory != null) { + targetDirectory = savepointDirectory; + } else if (this.savepointDirectory != null) { + targetDirectory = this.savepointDirectory; + } else { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'."); + } + + CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); + CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory); if (result.isSuccess()) { - PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint(); - return savepoint.getCompletionFuture(); - } - else { - return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); + return result.getPendingCheckpoint().getCompletionFuture(); + } else { + CompletableFuture failed = new FlinkCompletableFuture<>(); + failed.completeExceptionally(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); --- End diff -- `CheckpointDeclineReason` is not an `Exception`, but an enum of decline reasons. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82971228 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -282,29 +279,71 @@ public boolean isShutdown() { // Handling checkpoints and messages // - public Future triggerSavepoint(long timestamp) throws Exception { - CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint()); + /** +* Triggers a savepoint with the default savepoint directory as a target. +* +* @param timestamp The timestamp for the savepoint. +* @return A future to the completed checkpoint +* @throws IllegalStateException If no default savepoint directory has been configured +* @throws Exception Failures during triggering are forwarded +*/ + public Future triggerSavepoint(long timestamp) throws Exception { + return triggerSavepoint(timestamp, null); + } + + /** +* Triggers a savepoint with the given savepoint directory as a target. +* +* @param timestamp The timestamp for the savepoint. +* @param savepointDirectory Target directory for the savepoint. +* @return A future to the completed checkpoint +* @throws IllegalStateException If no savepoint directory has been +* specified and no default savepoint directory has been +* configured +* @throws Exception Failures during triggering are forwarded +*/ + public Future triggerSavepoint(long timestamp, String savepointDirectory) throws Exception { + String targetDirectory; + if (savepointDirectory != null) { + targetDirectory = savepointDirectory; + } else if (this.savepointDirectory != null) { + targetDirectory = this.savepointDirectory; + } else { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'."); + } + + CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); + CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory); if (result.isSuccess()) { - PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint(); - return savepoint.getCompletionFuture(); - } - else { - return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); + return result.getPendingCheckpoint().getCompletionFuture(); + } else { + CompletableFuture failed = new FlinkCompletableFuture<>(); --- End diff -- I was looking for that one, but didn't find it. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82970716 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -219,33 +245,9 @@ public CheckpointCoordinator( * Shuts down the checkpoint coordinator. * * After this method has been called, the coordinator does not accept -* and further messages and cannot trigger any further checkpoints. All -* checkpoint state is discarded. -*/ - public void shutdown() throws Exception { - shutdown(true); - } - - /** -* Suspends the checkpoint coordinator. -* -* After this method has been called, the coordinator does not accept * and further messages and cannot trigger any further checkpoints. -* -* The difference to shutdown is that checkpoint state in the store -* and counter is kept around if possible to recover later. -*/ - public void suspend() throws Exception { - shutdown(false); - } - - /** -* Shuts down the checkpoint coordinator. -* -* @param shutdownStoreAndCounter Depending on this flag the checkpoint -* state services are shut down or suspended. */ - private void shutdown(boolean shutdownStoreAndCounter) throws Exception { + public void shutdown(JobStatus jobStatus) throws Exception { --- End diff -- True, but I thought we kept it `shutdown` for consistency reasons. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82969876 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -637,20 +637,25 @@ protected int savepoint(String[] args) { "Specify a Job ID to trigger a savepoint.")); } - return triggerSavepoint(options, jobId); + String savepointDirectory = null; + if (cleanedArgs.length == 2) { --- End diff -- Changed the check to `>= 2` and printed a message that some arguments are unneded. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82786136 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -233,68 +229,90 @@ public int getNumberOfRetainedCheckpoints() { } @Override - public void shutdown() throws Exception { - LOG.info("Shutting down"); + public void shutdown(JobStatus jobStatus) throws Exception { --- End diff -- Maybe we can narrow the `Exception` a little bit down here or wrap the `InterruptedException` and `KeeperException` in our own exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82799818 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala --- @@ -467,8 +467,11 @@ object JobManagerMessages { * of triggering and acknowledging checkpoints. * * @param jobId The JobID of the job to trigger the savepoint for. +* @param savepointDirectory Optional target directory */ - case class TriggerSavepoint(jobId: JobID) extends RequiresLeaderSessionID + case class TriggerSavepoint( + jobId: JobID, + savepointDirectory : String = null) extends RequiresLeaderSessionID --- End diff -- Scalaesque would be to use `Option`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82797030 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java --- @@ -43,24 +125,62 @@ * @return The loaded savepoint * @throws Exception Failures during load are forwared */ - Savepoint loadSavepoint(String path) throws Exception; + public static Savepoint loadSavepoint(String path) throws IOException { + Preconditions.checkNotNull(path, "Path"); - /** -* Disposes the savepoint at the specified path. -* -* @param pathPath of savepoint to dispose -* @throws Exception Failures during diposal are forwarded -*/ - void disposeSavepoint(String path) throws Exception; + try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path { + int magicNumber = dis.readInt(); + + if (magicNumber == MAGIC_NUMBER) { + int version = dis.readInt(); + + SavepointSerializer serializer = SavepointSerializers.getSerializer(version); + return serializer.deserialize(dis); + } else { + throw new RuntimeException("Unexpected magic number. This is most likely " + + "caused by trying to load a Flink 1.0 savepoint. You cannot load a " + + "savepoint triggered by Flink 1.0 with this version of Flink. If it is " + + "_not_ a Flink 1.0 savepoint, this error indicates that the specified " + + "file is not a proper savepoint or the file has been corrupted."); + } + } + } /** -* Shut downs the savepoint store. +* Removes the savepoint meta data w/o loading and disposing it. * -* Only necessary for implementations where the savepoint life-cycle is -* bound to the cluster life-cycle. -* -* @throws Exception Failures during shut down are forwarded +* @param path Path of savepoint to remove +* @throws Exception Failures during disposal are forwarded */ - void shutdown() throws Exception; + public static void removeSavepoint(String path) throws Exception { --- End diff -- Could be `IOException` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82776861 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -637,20 +637,25 @@ protected int savepoint(String[] args) { "Specify a Job ID to trigger a savepoint.")); } - return triggerSavepoint(options, jobId); + String savepointDirectory = null; + if (cleanedArgs.length == 2) { --- End diff -- Not sure, but what happens if the user types `bin/flink savepoint savepointDirectory foobar`. This will ignore the savepointDirectory, right? Maybe we could print a corresponding message if something like that happens. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82784524 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java --- @@ -64,9 +62,9 @@ public void recover() throws Exception { @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - checkpoints.addLast(checkpoint); + checkpoints.add(checkpoint); if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { - checkpoints.removeFirst().discardState(); + checkpoints.remove().subsume(); --- End diff -- What happens if the `CheckpointProperties.discardOnSubsumed() == false`. This could lead to files in your nfs which are not cleaned up even though they are not useful for anything else (given that it's not a persistent checkpoint), right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82783495 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java --- @@ -256,29 +306,51 @@ public boolean acknowledgeTask( * Aborts a checkpoint because it expired (took too long). */ public void abortExpired() throws Exception { --- End diff -- Would it make sense to narrow the `Exception` a little bit down? Maybe introducing a `CheckpointException` or `PendingCheckpointException`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82778433 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -282,29 +279,71 @@ public boolean isShutdown() { // Handling checkpoints and messages // - public Future triggerSavepoint(long timestamp) throws Exception { - CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint()); + /** +* Triggers a savepoint with the default savepoint directory as a target. +* +* @param timestamp The timestamp for the savepoint. +* @return A future to the completed checkpoint +* @throws IllegalStateException If no default savepoint directory has been configured +* @throws Exception Failures during triggering are forwarded +*/ + public Future triggerSavepoint(long timestamp) throws Exception { + return triggerSavepoint(timestamp, null); + } + + /** +* Triggers a savepoint with the given savepoint directory as a target. +* +* @param timestamp The timestamp for the savepoint. +* @param savepointDirectory Target directory for the savepoint. +* @return A future to the completed checkpoint +* @throws IllegalStateException If no savepoint directory has been +* specified and no default savepoint directory has been +* configured +* @throws Exception Failures during triggering are forwarded +*/ + public Future triggerSavepoint(long timestamp, String savepointDirectory) throws Exception { + String targetDirectory; + if (savepointDirectory != null) { + targetDirectory = savepointDirectory; + } else if (this.savepointDirectory != null) { + targetDirectory = this.savepointDirectory; + } else { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'."); + } + + CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); + CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory); if (result.isSuccess()) { - PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint(); - return savepoint.getCompletionFuture(); - } - else { - return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); + return result.getPendingCheckpoint().getCompletionFuture(); + } else { + CompletableFuture failed = new FlinkCompletableFuture<>(); --- End diff -- In order to create a completed future you can write `FlinkCompletableFuture.completedExceptionally(Throwable t)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82781804 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java --- @@ -51,9 +57,11 @@ /** States of the different task groups belonging to this checkpoint */ private final MaptaskStates; - /** Flag to indicate whether the completed checkpoint data should be deleted when this -* handle to the checkpoint is disposed */ - private final boolean deleteStateWhenDisposed; + /** Properties for this checkpoint. */ + private final CheckpointProperties props; + + /** External path if persisted checkpoint; null otherwise. */ + private final String externalPath; --- End diff -- Could the external path be part of the `CheckpointProperties`? The external path could be an `Option`. Then we would get around the null field. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82778557 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -282,29 +279,71 @@ public boolean isShutdown() { // Handling checkpoints and messages // - public Future triggerSavepoint(long timestamp) throws Exception { - CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint()); + /** +* Triggers a savepoint with the default savepoint directory as a target. +* +* @param timestamp The timestamp for the savepoint. +* @return A future to the completed checkpoint +* @throws IllegalStateException If no default savepoint directory has been configured +* @throws Exception Failures during triggering are forwarded +*/ + public Future triggerSavepoint(long timestamp) throws Exception { + return triggerSavepoint(timestamp, null); + } + + /** +* Triggers a savepoint with the given savepoint directory as a target. +* +* @param timestamp The timestamp for the savepoint. +* @param savepointDirectory Target directory for the savepoint. +* @return A future to the completed checkpoint +* @throws IllegalStateException If no savepoint directory has been +* specified and no default savepoint directory has been +* configured +* @throws Exception Failures during triggering are forwarded +*/ + public Future triggerSavepoint(long timestamp, String savepointDirectory) throws Exception { + String targetDirectory; + if (savepointDirectory != null) { + targetDirectory = savepointDirectory; + } else if (this.savepointDirectory != null) { + targetDirectory = this.savepointDirectory; + } else { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'."); + } + + CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); + CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory); if (result.isSuccess()) { - PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint(); - return savepoint.getCompletionFuture(); - } - else { - return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); + return result.getPendingCheckpoint().getCompletionFuture(); + } else { + CompletableFuture failed = new FlinkCompletableFuture<>(); + failed.completeExceptionally(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); --- End diff -- Not adding the complete stack trace is on purpose, right? I'm wondering whether this could not help to debug problems later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82796870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java --- @@ -43,24 +125,62 @@ * @return The loaded savepoint * @throws Exception Failures during load are forwared */ - Savepoint loadSavepoint(String path) throws Exception; + public static Savepoint loadSavepoint(String path) throws IOException { + Preconditions.checkNotNull(path, "Path"); - /** -* Disposes the savepoint at the specified path. -* -* @param pathPath of savepoint to dispose -* @throws Exception Failures during diposal are forwarded -*/ - void disposeSavepoint(String path) throws Exception; + try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path { + int magicNumber = dis.readInt(); + + if (magicNumber == MAGIC_NUMBER) { + int version = dis.readInt(); + + SavepointSerializer serializer = SavepointSerializers.getSerializer(version); + return serializer.deserialize(dis); + } else { + throw new RuntimeException("Unexpected magic number. This is most likely " + + "caused by trying to load a Flink 1.0 savepoint. You cannot load a " + + "savepoint triggered by Flink 1.0 with this version of Flink. If it is " + + "_not_ a Flink 1.0 savepoint, this error indicates that the specified " + + "file is not a proper savepoint or the file has been corrupted."); + } + } + } /** -* Shut downs the savepoint store. +* Removes the savepoint meta data w/o loading and disposing it. * -* Only necessary for implementations where the savepoint life-cycle is -* bound to the cluster life-cycle. -* -* @throws Exception Failures during shut down are forwarded +* @param path Path of savepoint to remove +* @throws Exception Failures during disposal are forwarded */ - void shutdown() throws Exception; + public static void removeSavepoint(String path) throws Exception { + Preconditions.checkNotNull(path, "Path"); + + try { + LOG.info("Removing savepoint: " + path); --- End diff -- {} --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82781688 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -18,44 +18,243 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.jobgraph.JobStatus; + +import java.io.Serializable; + /** * The configuration of a checkpoint, such as whether * - * The checkpoint is a savepoint + * The checkpoint should be persisted * The checkpoint must be full, or may be incremental * The checkpoint format must be the common (cross backend) format, or may be state-backend specific --- End diff -- The second and third are not yet implemented, aren't they? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82777602 --- Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee --- @@ -29,8 +29,8 @@ angular.module('flinkApp', ['ui.router', 'angularMoment']) # -- .value 'flinkConfig', { - jobServer: '' -# jobServer: 'http://localhost:8081/' +# jobServer: '' + jobServer: 'http://localhost:8081/' --- End diff -- What's the reason for this change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82796670 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java --- @@ -18,23 +18,105 @@ package org.apache.flink.runtime.checkpoint.savepoint; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * Savepoint store used to persist {@link Savepoint} instances. + * A file system based savepoint store. * - * The main implementation is the {@link FsSavepointStore}. We also have the - * {@link HeapSavepointStore} for historical reasons (introduced in Flink 1.0). + * Stored savepoints have the following format: + * + * MagicNumber SavepointVersion Savepoint + * - MagicNumber => int + * - SavepointVersion => int (returned by Savepoint#getVersion()) + * - Savepoint => bytes (serialized via version-specific SavepointSerializer) + * */ -public interface SavepointStore { +public class SavepointStore { + + private static final Logger LOG = LoggerFactory.getLogger(SavepointStore.class); + + /** Magic number for sanity checks against stored savepoints. */ + private static final int MAGIC_NUMBER = 0x4960672d; + + /** Prefix for savepoint files. */ + private static final String prefix = "savepoint-"; /** * Stores the savepoint. * +* @param targetDirectory Target directory to store savepoint in * @param savepoint Savepoint to be stored * @paramSavepoint type * @return Path of stored savepoint * @throws Exception Failures during store are forwarded */ -String storeSavepoint(T savepoint) throws Exception; + public static String storeSavepoint( + String targetDirectory, + T savepoint) throws IOException { + + checkNotNull(targetDirectory, "Target directory"); + checkNotNull(savepoint, "Savepoint"); + + Exception latestException = null; + Path path = null; + FSDataOutputStream fdos = null; + + FileSystem fs = null; + + // Try to create a FS output stream + for (int attempt = 0; attempt < 10; attempt++) { + path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix)); + + if (fs == null) { + fs = FileSystem.get(path.toUri()); + } + + try { + fdos = fs.create(path, false); + break; + } catch (Exception e) { + latestException = e; + } + } + + if (fdos == null) { + throw new IOException("Failed to create file output stream at " + path, latestException); + } + + boolean success = false; + try (DataOutputStream dos = new DataOutputStream(fdos)) { + // Write header + dos.writeInt(MAGIC_NUMBER); + dos.writeInt(savepoint.getVersion()); + + // Write savepoint + SavepointSerializer serializer = SavepointSerializers.getSerializer(savepoint); + serializer.serialize(savepoint, dos); + success = true; + } finally { + if (!success && fs.exists(path)) { + if (!fs.delete(path, true)) { + LOG.warn("Failed to delete file " + path + " after failed write."); --- End diff -- Placeholder {} --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82778103 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -219,33 +245,9 @@ public CheckpointCoordinator( * Shuts down the checkpoint coordinator. * * After this method has been called, the coordinator does not accept -* and further messages and cannot trigger any further checkpoints. All -* checkpoint state is discarded. -*/ - public void shutdown() throws Exception { - shutdown(true); - } - - /** -* Suspends the checkpoint coordinator. -* -* After this method has been called, the coordinator does not accept * and further messages and cannot trigger any further checkpoints. -* -* The difference to shutdown is that checkpoint state in the store -* and counter is kept around if possible to recover later. -*/ - public void suspend() throws Exception { - shutdown(false); - } - - /** -* Shuts down the checkpoint coordinator. -* -* @param shutdownStoreAndCounter Depending on this flag the checkpoint -* state services are shut down or suspended. */ - private void shutdown(boolean shutdownStoreAndCounter) throws Exception { + public void shutdown(JobStatus jobStatus) throws Exception { --- End diff -- Didn't you tell me that the verb is `shutDown`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82787864 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java --- @@ -48,13 +48,12 @@ public static CompletedCheckpoint loadAndValidateSavepoint( JobID jobId, Maptasks, - SavepointStore savepointStore, String savepointPath) throws Exception { // (1) load the savepoint - Savepoint savepoint = savepointStore.loadSavepoint(savepointPath); + Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath); final Map taskStates = new HashMap<>(savepoint.getTaskStates().size()); - + // (2) validate it (parallelism, etc) for (TaskState taskState : savepoint.getTaskStates()) { ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID()); --- End diff -- Just a question for line 74. Shouldn't this be `taskState.getMaxParallelism()` and `executionJobVertex.getParallelism()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82782971 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java --- @@ -51,9 +57,11 @@ /** States of the different task groups belonging to this checkpoint */ private final MaptaskStates; - /** Flag to indicate whether the completed checkpoint data should be deleted when this -* handle to the checkpoint is disposed */ - private final boolean deleteStateWhenDisposed; + /** Properties for this checkpoint. */ + private final CheckpointProperties props; + + /** External path if persisted checkpoint; null otherwise. */ + private final String externalPath; --- End diff -- Ok I see, the path is not fully known when creating the `CheckpointProperties`... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82784661 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java --- @@ -64,9 +62,9 @@ public void recover() throws Exception { @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - checkpoints.addLast(checkpoint); + checkpoints.add(checkpoint); if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { - checkpoints.removeFirst().discardState(); + checkpoints.remove().subsume(); --- End diff -- Maybe we shouldn't allow all different combinations of `CheckpointProperties`. Only those which make sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2608 [FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints ## Introduction This is the first part of [FLIP-10](https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints), allowing users to persist periodic checkpoints. Persistent checkpoints behave very much like regular periodic checkpoints except the following differences: 1. They persist their meta data (like savepoints). 2. They are not discarded when the owning job fails permanently. Furthermore, they can be configured to not be discarded when the job is cancelled. This means that if a job fails permanently the user will have a checkpoint available to restore from. As an example think of the following scenario: a job runs smoothly until it hits a bad record that it cannot handle. The current behaviour will be that the job will try to recover, but it will hit the bad record again and keep on failing. With persistent checkpoints, the user can update the program to handle bad records and restore from the most recent persistent checkpoints. ## CheckpointConfig This adds the following `@PublicEvolving` methods to `CheckpointConfig`: ``` enablePersistentCheckpoints(String targetDirectory); enablePersistentCheckpoints(String targetDirectory, PersistentCheckpointCleanup cleanup) ``` The `PersistentCheckpointCleanup` defines how persistent checkpoints are cleaned up when the owning job is cancelled. Since currently most streaming jobs are stopped via cancellation, the default is to clean persistent checkpoints up. The user can overwrite this behaviour via the enum. ## REST API The REST API exposes the external-path of the most recent persistent checkpoint via the REST API. This is also displayed in the web UI for the most recent persistent checkpoint. ![screen shot 2016-10-07 at 17 50 44](https://cloud.githubusercontent.com/assets/1756620/19196699/d0d5065a-8cb6-11e6-8b13-c6bacc4ebe19.png) ## Deprecate savepoint state backends (FLINK-4507) Furthermore, the savepoint state backends have been removed and all savepoints now go to files. The corresponding configuration keys have been removed or deprecated: `savepoints.state.backend.fs.dir` has been deprecated in favour of `state.savepoints.dir`. `savepoints.state.backend` has been removed. ## Allow to specify custom savepoint directory (FLINK-4509) The target directory for savepoints was configured per Flink configuration. With this change, this can be overwritten: ``` bin/flink savepoint [targetDirectory] ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 4512-persistent_checkpoints Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2608.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2608 commit 004ba0b38ac2b75148910660242808c13746c444 Author: Ufuk CelebiDate: 2016-10-06T14:43:42Z [FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints [FLINK-4509] [FLIP-10] Specify savepoint directory per savepoint [FLINK-4507] [FLIP-10] Deprecate savepoint backend config --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---