sjwiesman commented on a change in pull request #13912:
URL: https://github.com/apache/flink/pull/13912#discussion_r566378707



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata 
(and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but 
also accepts null for
+ * both of then, in which case creating externalized checkpoint is not 
possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to 
enable implementations
+ * that only optionally support default savepoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * <p>The checkpoint storage is configured with a base directory and persists 
the checkpoint data of
+ * specific checkpoints in specific subdirectories. For example, if the base 
directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will 
create a subdirectory
+ * with the job's ID that will contain the actual checkpoints: ({@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory 
that includes the
+ * checkpoint number, such as {@code
+ * 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * <p>A savepoint that is set to be stored in path {@code 
hdfs://namenode:port/flink-savepoints/},
+ * will create a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in 
which it stores all
+ * savepoint data. The random digits are added as "entropy" to avoid directory 
collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file '{@value
+ * AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
+ */
+@PublicEvolving
+public abstract class AbstractFileCheckpointStorage implements 
CheckpointStorage {
+
+    private static final long serialVersionUID = 1L;
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint Storage Properties
+    // ------------------------------------------------------------------------
+
+    /** The path where checkpoints will be stored, or null, if none has been 
configured. */
+    @Nullable private Path baseCheckpointPath;
+
+    /** The path where savepoints will be stored, or null, if none has been 
configured. */
+    @Nullable private Path baseSavepointPath;
+
+    @Override
+    public CompletedCheckpointStorageLocation resolveCheckpoint(String 
pointer) throws IOException {
+        return 
AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(pointer);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no 
custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default 
directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getSavepointPath() {
+        return baseSavepointPath;
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(String baseSavepointPath) {
+        setSavepointPath(new Path(baseSavepointPath));
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(@Nullable Path baseSavepointPath) {
+        this.baseSavepointPath = baseSavepointPath == null ? null : 
validatePath(baseSavepointPath);
+    }
+
+    /**
+     * Sets the given savepoint directory, or the values defined in the given 
configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the 
value in the
+     * configuration. If the configuration does not specify a value, it is 
possible that the
+     * savepoint directory in the savepoint storage will be null.
+     *
+     * @param baseSavepointPath The checkpoint base directory to use (or null).
+     * @param config The configuration to read values from.
+     */
+    public void setSavepointPath(Path baseSavepointPath, ReadableConfig 
config) {
+        this.baseSavepointPath =
+                parameterOrConfigured(
+                        baseSavepointPath, config, 
CheckpointingOptions.SAVEPOINT_DIRECTORY);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no 
custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default 
directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getCheckpointPath() {
+        return baseCheckpointPath;
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(String baseCheckpointPath) {
+        setCheckpointPath(new Path(baseCheckpointPath));
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(@Nullable Path baseCheckpointPath) {
+        this.baseCheckpointPath =
+                baseCheckpointPath == null ? null : 
validatePath(baseCheckpointPath);
+    }
+
+    /**
+     * Sets the given checkpoint directory, or the values defined in the given 
configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the 
value in the
+     * configuration. If the configuration does not specify a value, it is 
possible that the
+     * checkpoint directory in the checkpoint storage will be null.
+     *
+     * @param baseCheckpointPath The checkpoint base directory to use (or 
null).
+     * @param config The configuration to read values from.
+     */
+    public void setCheckpointPath(Path baseCheckpointPath, ReadableConfig 
config) {
+        this.baseCheckpointPath =
+                parameterOrConfigured(
+                        baseCheckpointPath, config, 
CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Checks the validity of the path's scheme and path.
+     *
+     * @param path The path to check.
+     * @return The URI as a Path.
+     * @throws IllegalArgumentException Thrown, if the URI misses scheme or 
path.
+     */
+    protected static Path validatePath(Path path) {

Review comment:
       I'm not sure, some of these are fairly checkpointing specific but I do 
think this can be cleaned up and less low-level. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to