[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565540#comment-15565540
 ] 

ASF GitHub Bot commented on FLINK-4512:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2608#discussion_r82796670
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
    @@ -18,23 +18,105 @@
     
     package org.apache.flink.runtime.checkpoint.savepoint;
     
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
     /**
    - * Savepoint store used to persist {@link Savepoint} instances.
    + * A file system based savepoint store.
      *
    - * <p>The main implementation is the {@link FsSavepointStore}. We also 
have the
    - * {@link HeapSavepointStore} for historical reasons (introduced in Flink 
1.0).
    + * <p>Stored savepoints have the following format:
    + * <pre>
    + * MagicNumber SavepointVersion Savepoint
    + *   - MagicNumber => int
    + *   - SavepointVersion => int (returned by Savepoint#getVersion())
    + *   - Savepoint => bytes (serialized via version-specific 
SavepointSerializer)
    + * </pre>
      */
    -public interface SavepointStore {
    +public class SavepointStore {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(SavepointStore.class);
    +
    +   /** Magic number for sanity checks against stored savepoints. */
    +   private static final int MAGIC_NUMBER = 0x4960672d;
    +
    +   /** Prefix for savepoint files. */
    +   private static final String prefix = "savepoint-";
     
        /**
         * Stores the savepoint.
         *
    +    * @param targetDirectory Target directory to store savepoint in
         * @param savepoint Savepoint to be stored
         * @param <T>       Savepoint type
         * @return Path of stored savepoint
         * @throws Exception Failures during store are forwarded
         */
    -   <T extends Savepoint> String storeSavepoint(T savepoint) throws 
Exception;
    +   public static <T extends Savepoint> String storeSavepoint(
    +                   String targetDirectory,
    +                   T savepoint) throws IOException {
    +
    +           checkNotNull(targetDirectory, "Target directory");
    +           checkNotNull(savepoint, "Savepoint");
    +
    +           Exception latestException = null;
    +           Path path = null;
    +           FSDataOutputStream fdos = null;
    +
    +           FileSystem fs = null;
    +
    +           // Try to create a FS output stream
    +           for (int attempt = 0; attempt < 10; attempt++) {
    +                   path = new Path(targetDirectory, 
FileUtils.getRandomFilename(prefix));
    +
    +                   if (fs == null) {
    +                           fs = FileSystem.get(path.toUri());
    +                   }
    +
    +                   try {
    +                           fdos = fs.create(path, false);
    +                           break;
    +                   } catch (Exception e) {
    +                           latestException = e;
    +                   }
    +           }
    +
    +           if (fdos == null) {
    +                   throw new IOException("Failed to create file output 
stream at " + path, latestException);
    +           }
    +
    +           boolean success = false;
    +           try (DataOutputStream dos = new DataOutputStream(fdos)) {
    +                   // Write header
    +                   dos.writeInt(MAGIC_NUMBER);
    +                   dos.writeInt(savepoint.getVersion());
    +
    +                   // Write savepoint
    +                   SavepointSerializer<T> serializer = 
SavepointSerializers.getSerializer(savepoint);
    +                   serializer.serialize(savepoint, dos);
    +                   success = true;
    +           } finally {
    +                   if (!success && fs.exists(path)) {
    +                           if (!fs.delete(path, true)) {
    +                                   LOG.warn("Failed to delete file " + 
path + " after failed write.");
    --- End diff --
    
    Placeholder {}


> Add option for persistent checkpoints
> -------------------------------------
>
>                 Key: FLINK-4512
>                 URL: https://issues.apache.org/jira/browse/FLINK-4512
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to