[ https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568325#comment-15568325 ]
ASF GitHub Bot commented on FLINK-4512: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82977266 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java --- @@ -18,23 +18,105 @@ package org.apache.flink.runtime.checkpoint.savepoint; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * Savepoint store used to persist {@link Savepoint} instances. + * A file system based savepoint store. * - * <p>The main implementation is the {@link FsSavepointStore}. We also have the - * {@link HeapSavepointStore} for historical reasons (introduced in Flink 1.0). + * <p>Stored savepoints have the following format: + * <pre> + * MagicNumber SavepointVersion Savepoint + * - MagicNumber => int + * - SavepointVersion => int (returned by Savepoint#getVersion()) + * - Savepoint => bytes (serialized via version-specific SavepointSerializer) + * </pre> */ -public interface SavepointStore { +public class SavepointStore { + + private static final Logger LOG = LoggerFactory.getLogger(SavepointStore.class); + + /** Magic number for sanity checks against stored savepoints. */ + private static final int MAGIC_NUMBER = 0x4960672d; + + /** Prefix for savepoint files. */ + private static final String prefix = "savepoint-"; /** * Stores the savepoint. * + * @param targetDirectory Target directory to store savepoint in * @param savepoint Savepoint to be stored * @param <T> Savepoint type * @return Path of stored savepoint * @throws Exception Failures during store are forwarded */ - <T extends Savepoint> String storeSavepoint(T savepoint) throws Exception; + public static <T extends Savepoint> String storeSavepoint( + String targetDirectory, + T savepoint) throws IOException { + + checkNotNull(targetDirectory, "Target directory"); + checkNotNull(savepoint, "Savepoint"); + + Exception latestException = null; + Path path = null; + FSDataOutputStream fdos = null; + + FileSystem fs = null; + + // Try to create a FS output stream + for (int attempt = 0; attempt < 10; attempt++) { + path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix)); + + if (fs == null) { + fs = FileSystem.get(path.toUri()); + } + + try { + fdos = fs.create(path, false); + break; + } catch (Exception e) { + latestException = e; + } + } + + if (fdos == null) { + throw new IOException("Failed to create file output stream at " + path, latestException); + } + + boolean success = false; + try (DataOutputStream dos = new DataOutputStream(fdos)) { + // Write header + dos.writeInt(MAGIC_NUMBER); + dos.writeInt(savepoint.getVersion()); + + // Write savepoint + SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint); + serializer.serialize(savepoint, dos); + success = true; + } finally { + if (!success && fs.exists(path)) { + if (!fs.delete(path, true)) { + LOG.warn("Failed to delete file " + path + " after failed write."); --- End diff -- Replaced here and below > Add option for persistent checkpoints > ------------------------------------- > > Key: FLINK-4512 > URL: https://issues.apache.org/jira/browse/FLINK-4512 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > > Allow periodic checkpoints to be persisted by writing out their meta data. > This is what we currently do for savepoints, but in the future checkpoints > and savepoints are likely to diverge with respect to guarantees they give for > updatability, etc. > This means that the difference between persistent checkpoints and savepoints > in the long term will be that persistent checkpoints can only be restored > with the same job settings (like parallelism, etc.) > Regular and persisted checkpoints should behave differently with respect to > disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): > regular checkpoints are cleaned up in all of these cases whereas persistent > checkpoints only on FINISHED. Maybe with the option to customize behaviour on > CANCELLED or FAILED. -- This message was sent by Atlassian JIRA (v6.3.4#6332)