This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed75795e97800177cb67141ab838632d5ec55bb5
Author: Jinzhong Li <lijinzhong2...@gmail.com>
AuthorDate: Wed Feb 28 12:19:36 2024 +0800

    [FLINK-34484][state] Split 'state.backend.local-recovery' into two options 
for checkpointing and recovery
---
 .../generated/checkpointing_configuration.html     |  6 ++++
 .../generated/common_state_backends_section.html   |  6 ++++
 .../generated/state_recovery_configuration.html    |  2 +-
 .../flink/configuration/CheckpointingOptions.java  | 24 +++++++++++++-
 .../flink/configuration/StateRecoveryOptions.java  | 16 +++++----
 .../fs/DuplicatingStateChangeFsUploader.java       | 10 +++---
 .../changelog/fs/FsStateChangelogStorage.java      |  2 +-
 .../flink/changelog/fs/FsStateChangelogWriter.java |  2 +-
 .../changelog/fs/StateChangeUploadScheduler.java   |  2 +-
 .../api/runtime/SavepointTaskStateManager.java     |  2 +-
 .../DefaultSlotPoolServiceSchedulerFactory.java    |  1 -
 .../state/ChangelogTaskLocalStateStore.java        |  2 +-
 .../state/CheckpointStreamWithResultProvider.java  |  2 +-
 .../flink/runtime/state/LocalRecoveryConfig.java   | 38 +++++++++++++++++++---
 ...er.java => LocalSnapshotDirectoryProvider.java} | 14 ++++----
 ...ava => LocalSnapshotDirectoryProviderImpl.java} | 10 +++---
 .../state/TaskExecutorLocalStateStoresManager.java | 21 +++++++-----
 .../runtime/state/TaskLocalStateStoreImpl.java     | 14 +++++++-
 .../runtime/state/heap/HeapSnapshotStrategy.java   |  2 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  1 +
 .../TaskManagerServicesConfiguration.java          | 14 ++++++--
 .../state/ChangelogTaskLocalStateStoreTest.java    | 15 +++++----
 .../CheckpointStreamWithResultProviderTest.java    |  8 ++---
 ...=> LocalSnapshotDirectoryProviderImplTest.java} | 10 +++---
 .../TaskExecutorLocalStateStoresManagerTest.java   |  2 +-
 .../runtime/state/TaskLocalStateStoreImplTest.java |  7 ++--
 .../runtime/state/TaskStateManagerImplTest.java    |  7 ++--
 .../runtime/state/TestLocalRecoveryConfig.java     |  6 ++--
 .../state/changelog/ChangelogStateDiscardTest.java |  2 +-
 .../snapshot/RocksDBSnapshotStrategyBase.java      |  8 ++---
 .../runtime/tasks/LocalStateForwardingTest.java    |  9 ++---
 .../runtime/tasks/StreamTaskTestHarness.java       |  6 ++--
 .../benchmark/StateBackendBenchmarkUtils.java      |  4 +--
 .../restore/StreamOperatorSnapshotRestoreTest.java | 13 +++++---
 34 files changed, 194 insertions(+), 94 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html 
b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
index 0cd3dd53755..c87a9c33803 100644
--- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
@@ -8,6 +8,12 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>execution.checkpointing.local-backup.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>This option configures local backup for the state backend, 
which indicates whether to make backup checkpoint on local disk.  If not 
configured, fallback to execution.state-recovery.from-local. By default, local 
backup is deactivated. Local backup currently only covers keyed state backends 
(including both the EmbeddedRocksDBStateBackend and the 
HashMapStateBackend).</td>
+        </tr>
         <tr>
             <td><h5>state.backend.incremental</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/docs/layouts/shortcodes/generated/common_state_backends_section.html 
b/docs/layouts/shortcodes/generated/common_state_backends_section.html
index a16650bf412..ab664f51bfb 100644
--- a/docs/layouts/shortcodes/generated/common_state_backends_section.html
+++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html
@@ -32,6 +32,12 @@
             <td>String</td>
             <td>The default directory for savepoints. Used by the state 
backends that write savepoints to file systems (HashMapStateBackend, 
EmbeddedRocksDBStateBackend).</td>
         </tr>
+        <tr>
+            <td><h5>execution.state-recovery.from-local</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>This option configures local recovery for the state backend, 
which indicates whether to recovery from local snapshot.By default, local 
recovery is deactivated. Local recovery currently only covers keyed state 
backends (including both the EmbeddedRocksDBStateBackend and the 
HashMapStateBackend)."</td>
+        </tr>
         <tr>
             <td><h5>state.backend.incremental</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/docs/layouts/shortcodes/generated/state_recovery_configuration.html 
b/docs/layouts/shortcodes/generated/state_recovery_configuration.html
index 2a1d914e2fd..2df071ea4d5 100644
--- a/docs/layouts/shortcodes/generated/state_recovery_configuration.html
+++ b/docs/layouts/shortcodes/generated/state_recovery_configuration.html
@@ -18,7 +18,7 @@
             <td><h5>execution.state-recovery.from-local</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td> This option configures local recovery for the state backend. 
By default, local recovery is deactivated. Local recovery currently only covers 
keyed state backends (including both the EmbeddedRocksDBStateBackend and the 
HashMapStateBackend)."</td>
+            <td>This option configures local recovery for the state backend, 
which indicates whether to recovery from local snapshot.By default, local 
recovery is deactivated. Local recovery currently only covers keyed state 
backends (including both the EmbeddedRocksDBStateBackend and the 
HashMapStateBackend)."</td>
         </tr>
         <tr>
             <td><h5>execution.state-recovery.ignore-unclaimed-state</h5></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index a5a00e813a0..5626da3d467 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -179,7 +179,8 @@ public class CheckpointingOptions {
      * <p>Local recovery currently only covers keyed state backends (including 
both the
      * EmbeddedRocksDBStateBackend and the HashMapStateBackend).
      *
-     * @Deprecated use {@link StateRecoveryOptions#LOCAL_RECOVERY} instead.
+     * @deprecated use {@link StateRecoveryOptions#LOCAL_RECOVERY} and {@link
+     *     CheckpointingOptions#LOCAL_BACKUP_ENABLED} instead.
      */
     @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
     @Documentation.ExcludeFromDocumentation("Hidden for deprecated")
@@ -309,4 +310,25 @@ public class CheckpointingOptions {
                                             + "The actual write buffer size is 
determined to be the maximum of the value of this option and option '%s'.",
                                     FS_SMALL_FILE_THRESHOLD.key()))
                     .withDeprecatedKeys("state.backend.fs.write-buffer-size");
+
+    /**
+     * This option configures local backup for the state backend, which 
indicates whether to make
+     * backup checkpoint on local disk. If not configured, fallback to {@link
+     * StateRecoveryOptions#LOCAL_RECOVERY}. By default, local backup is 
deactivated. Local backup
+     * currently only covers keyed state backends (including both the 
EmbeddedRocksDBStateBackend
+     * and the HashMapStateBackend).
+     */
+    public static final ConfigOption<Boolean> LOCAL_BACKUP_ENABLED =
+            ConfigOptions.key("execution.checkpointing.local-backup.enabled")
+                    .booleanType()
+                    
.defaultValue(StateRecoveryOptions.LOCAL_RECOVERY.defaultValue())
+                    
.withFallbackKeys(StateRecoveryOptions.LOCAL_RECOVERY.key())
+                    .withDeprecatedKeys(LOCAL_RECOVERY.key())
+                    .withDescription(
+                            "This option configures local backup for the state 
backend, "
+                                    + "which indicates whether to make backup 
checkpoint on local disk.  "
+                                    + "If not configured, fallback to "
+                                    + StateRecoveryOptions.LOCAL_RECOVERY.key()
+                                    + ". By default, local backup is 
deactivated. Local backup currently only "
+                                    + "covers keyed state backends (including 
both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).");
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
index cfd585b89f6..c19329c5d92 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java
@@ -99,19 +99,21 @@ public class StateRecoveryOptions {
                                     .build());
 
     /**
-     * This option configures local recovery for the state backend. By 
default, local recovery is
-     * deactivated.
+     * This option configures local recovery for the state backend, which 
indicates whether to
+     * recovery from local snapshot. By default, local recovery is deactivated.
      *
      * <p>Local recovery currently only covers keyed state backends (including 
both the
      * EmbeddedRocksDBStateBackend and the HashMapStateBackend).
-     *
      */
+    @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
     public static final ConfigOption<Boolean> LOCAL_RECOVERY =
             ConfigOptions.key("execution.state-recovery.from-local")
                     .booleanType()
                     .defaultValue(false)
-                    
.withDeprecatedKeys(CheckpointingOptions.LOCAL_RECOVERY.key())
-                    .withDescription(" This option configures local recovery 
for the state backend. "
-                            + "By default, local recovery is deactivated. 
Local recovery currently only "
-                            + "covers keyed state backends (including both the 
EmbeddedRocksDBStateBackend and the HashMapStateBackend).\"");
+                    .withDeprecatedKeys("state.backend.local-recovery")
+                    .withDescription(
+                            "This option configures local recovery for the 
state backend, "
+                                    + "which indicates whether to recovery 
from local snapshot."
+                                    + "By default, local recovery is 
deactivated. Local recovery currently only "
+                                    + "covers keyed state backends (including 
both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).\"");
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
index 50db749b973..babe8fa8d28 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
@@ -68,7 +68,7 @@ public class DuplicatingStateChangeFsUploader extends 
AbstractStateChangeFsUploa
 
     private final Path basePath;
     private final FileSystem fileSystem;
-    private final LocalRecoveryDirectoryProvider 
localRecoveryDirectoryProvider;
+    private final LocalSnapshotDirectoryProvider 
localSnapshotDirectoryProvider;
     private final JobID jobID;
 
     public DuplicatingStateChangeFsUploader(
@@ -79,12 +79,12 @@ public class DuplicatingStateChangeFsUploader extends 
AbstractStateChangeFsUploa
             int bufferSize,
             ChangelogStorageMetricGroup metrics,
             TaskChangelogRegistry changelogRegistry,
-            LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider) {
+            LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider) {
         super(compression, bufferSize, metrics, changelogRegistry, 
FileStateHandle::new);
         this.basePath =
                 new Path(basePath, String.format("%s/%s", jobID.toHexString(), 
PATH_SUB_DIR));
         this.fileSystem = fileSystem;
-        this.localRecoveryDirectoryProvider = localRecoveryDirectoryProvider;
+        this.localSnapshotDirectoryProvider = localSnapshotDirectoryProvider;
         this.jobID = jobID;
     }
 
@@ -96,7 +96,7 @@ public class DuplicatingStateChangeFsUploader extends 
AbstractStateChangeFsUploa
         FSDataOutputStream primaryStream = fileSystem.create(path, 
WriteMode.NO_OVERWRITE);
         Path localPath =
                 new Path(
-                        
getLocalTaskOwnedDirectory(localRecoveryDirectoryProvider, jobID),
+                        
getLocalTaskOwnedDirectory(localSnapshotDirectoryProvider, jobID),
                         fileName);
         FSDataOutputStream secondaryStream =
                 localPath.getFileSystem().create(localPath, 
WriteMode.NO_OVERWRITE);
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index 1d4668cb8e8..66d6409c806 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -143,7 +143,7 @@ public class FsStateChangelogStorage extends 
FsStateChangelogStorageForRecovery
         this.changelogRegistry = changelogRegistry;
         this.uploader = uploader;
         this.localRecoveryConfig = localRecoveryConfig;
-        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+        if (localRecoveryConfig.isLocalBackupEnabled()) {
             this.localChangelogRegistry =
                     new 
LocalChangelogRegistryImpl(Executors.newSingleThreadExecutor());
         }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index f0a4b21646e..9f0c74964b2 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -421,7 +421,7 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
                         size,
                         incrementalSize,
                         FsStateChangelogStorageFactory.IDENTIFIER);
-        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+        if (localRecoveryConfig.isLocalBackupEnabled()) {
             size = 0;
             List<Tuple2<StreamStateHandle, Long>> localTuples = new 
ArrayList<>();
             for (UploadResult uploadResult : results.values()) {
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
index 19f09a151e6..45386f842b2 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
@@ -97,7 +97,7 @@ public interface StateChangeUploadScheduler extends 
AutoCloseable {
         checkArgument(bytes <= Integer.MAX_VALUE);
         int bufferSize = (int) bytes;
         StateChangeUploader store =
-                localRecoveryConfig.isLocalRecoveryEnabled()
+                localRecoveryConfig.isLocalBackupEnabled()
                         ? new DuplicatingStateChangeFsUploader(
                                 jobID,
                                 basePath,
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
index 6ce30d61243..1fb84df416d 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
@@ -89,7 +89,7 @@ final class SavepointTaskStateManager implements 
TaskStateManager {
     @Nonnull
     @Override
     public LocalRecoveryConfig createLocalRecoveryConfig() {
-        return new LocalRecoveryConfig(null);
+        return LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 8e85563fdf9..1472f247a10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RpcOptions;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
index 23b2b6fd42a..7be6d9ead80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
@@ -101,7 +101,7 @@ public class ChangelogTaskLocalStateStore extends 
TaskLocalStateStoreImpl {
     }
 
     public static Path getLocalTaskOwnedDirectory(
-            LocalRecoveryDirectoryProvider provider, JobID jobID) {
+            LocalSnapshotDirectoryProvider provider, JobID jobID) {
         File outDir =
                 provider.selectAllocationBaseDirectory(
                         (jobID.hashCode() & Integer.MAX_VALUE)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
index 3378ccb7713..a4bc3c27f54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
@@ -164,7 +164,7 @@ public interface CheckpointStreamWithResultProvider extends 
Closeable {
             @Nonnegative long checkpointId,
             @Nonnull CheckpointedStateScope checkpointedStateScope,
             @Nonnull CheckpointStreamFactory primaryStreamFactory,
-            @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider)
+            @Nonnull LocalSnapshotDirectoryProvider secondaryStreamDirProvider)
             throws IOException {
 
         CheckpointStateOutputStream primaryOut =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
index 5a378b60ae2..e4b833540d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Optional;
@@ -30,18 +33,40 @@ import java.util.function.Supplier;
  */
 public class LocalRecoveryConfig {
 
+    public static final LocalRecoveryConfig BACKUP_AND_RECOVERY_DISABLED =
+            new LocalRecoveryConfig(false, false, null);
+
+    /** Whether to recover from the local snapshot. */
+    private final boolean localRecoveryEnabled;
+
+    /** Whether to do backup checkpoint on local disk. */
+    private final boolean localBackupEnabled;
+
     /** Encapsulates the root directories and the subtask-specific path. */
-    @Nullable private final LocalRecoveryDirectoryProvider 
localStateDirectories;
+    @Nullable private final LocalSnapshotDirectoryProvider 
localStateDirectories;
 
-    public LocalRecoveryConfig(@Nullable LocalRecoveryDirectoryProvider 
directoryProvider) {
+    public LocalRecoveryConfig(
+            boolean localRecoveryEnabled,
+            boolean localBackupEnabled,
+            @Nullable LocalSnapshotDirectoryProvider directoryProvider) {
+        this.localRecoveryEnabled = localRecoveryEnabled;
+        this.localBackupEnabled = localBackupEnabled;
         this.localStateDirectories = directoryProvider;
     }
 
     public boolean isLocalRecoveryEnabled() {
-        return localStateDirectories != null;
+        return localRecoveryEnabled;
+    }
+
+    public boolean isLocalBackupEnabled() {
+        return localBackupEnabled;
+    }
+
+    public boolean isLocalRecoveryOrLocalBackupEnabled() {
+        return localRecoveryEnabled || localBackupEnabled;
     }
 
-    public Optional<LocalRecoveryDirectoryProvider> 
getLocalStateDirectoryProvider() {
+    public Optional<LocalSnapshotDirectoryProvider> 
getLocalStateDirectoryProvider() {
         return Optional.ofNullable(localStateDirectories);
     }
 
@@ -55,4 +80,9 @@ public class LocalRecoveryConfig {
                 new IllegalStateException(
                         "Getting a LocalRecoveryDirectoryProvider is only 
supported with the local recovery enabled. This is a bug and should be 
reported.");
     }
+
+    public static LocalRecoveryConfig backupAndRecoveryEnabled(
+            @Nonnull LocalSnapshotDirectoryProvider directoryProvider) {
+        return new LocalRecoveryConfig(true, true, 
Preconditions.checkNotNull(directoryProvider));
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProvider.java
similarity index 83%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProvider.java
index 85d2ab00161..0ab95bcf00b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProvider.java
@@ -22,12 +22,12 @@ import java.io.File;
 import java.io.Serializable;
 
 /**
- * Provides directories for local recovery. It offers access to the allocation 
base directories
- * (i.e. the root directories for all local state that is created under the 
same allocation id) and
- * the subtask-specific paths, which contain the local state for one subtask. 
Access by checkpoint
- * id rotates over all root directory indexes, in case that there is more than 
one. Selection
- * methods are provided to pick the directory under a certain index. Directory 
structures are of the
- * following shape:
+ * Provides directories for local backup or local recovery. It offers access 
to the allocation base
+ * directories (i.e. the root directories for all local state that is created 
under the same
+ * allocation id) and the subtask-specific paths, which contain the local 
state for one subtask.
+ * Access by checkpoint id rotates over all root directory indexes, in case 
that there is more than
+ * one. Selection methods are provided to pick the directory under a certain 
index. Directory
+ * structures are of the following shape:
  *
  * <p>
  *
@@ -48,7 +48,7 @@ import java.io.Serializable;
  *
  * <p>
  */
-public interface LocalRecoveryDirectoryProvider extends Serializable {
+public interface LocalSnapshotDirectoryProvider extends Serializable {
 
     /**
      * Returns the local state allocation base directory for given checkpoint 
id w.r.t. our rotation
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImpl.java
similarity index 93%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImpl.java
index c2ee005806f..128e16758f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImpl.java
@@ -33,15 +33,15 @@ import java.io.File;
 import java.nio.file.Paths;
 import java.util.Arrays;
 
-/** Implementation of {@link LocalRecoveryDirectoryProvider}. */
-public class LocalRecoveryDirectoryProviderImpl implements 
LocalRecoveryDirectoryProvider {
+/** Implementation of {@link LocalSnapshotDirectoryProvider}. */
+public class LocalSnapshotDirectoryProviderImpl implements 
LocalSnapshotDirectoryProvider {
 
     /** Serial version. */
     private static final long serialVersionUID = 1L;
 
     /** Logger for this class. */
     private static final Logger LOG =
-            LoggerFactory.getLogger(LocalRecoveryDirectoryProviderImpl.class);
+            LoggerFactory.getLogger(LocalSnapshotDirectoryProviderImpl.class);
 
     /** All available root directories that this can potentially deliver. */
     @Nonnull private final File[] allocationBaseDirs;
@@ -55,7 +55,7 @@ public class LocalRecoveryDirectoryProviderImpl implements 
LocalRecoveryDirector
     /** Index of the owning subtask. */
     @Nonnegative private final int subtaskIndex;
 
-    public LocalRecoveryDirectoryProviderImpl(
+    public LocalSnapshotDirectoryProviderImpl(
             File allocationBaseDir,
             @Nonnull JobID jobID,
             @Nonnull JobVertexID jobVertexID,
@@ -63,7 +63,7 @@ public class LocalRecoveryDirectoryProviderImpl implements 
LocalRecoveryDirector
         this(new File[] {allocationBaseDir}, jobID, jobVertexID, subtaskIndex);
     }
 
-    public LocalRecoveryDirectoryProviderImpl(
+    public LocalSnapshotDirectoryProviderImpl(
             @Nonnull File[] allocationBaseDirs,
             @Nonnull JobID jobID,
             @Nonnull JobVertexID jobVertexID,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index b58b7fbde93..efb0f54f4ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -67,9 +67,12 @@ public class TaskExecutorLocalStateStoresManager {
     private final Map<AllocationID, Map<JobVertexSubtaskKey, 
OwnedTaskLocalStateStore>>
             taskStateStoresByAllocationID;
 
-    /** The configured mode for local recovery on this task manager. */
+    /** Whether to recover from the local snapshot. */
     private final boolean localRecoveryEnabled;
 
+    /** Whether to do backup checkpoint on local disk. */
+    private final boolean localBackupEnabled;
+
     /** This is the root directory for all local state of this task manager / 
executor. */
     private final Reference<File[]> localStateRootDirectories;
 
@@ -86,6 +89,7 @@ public class TaskExecutorLocalStateStoresManager {
 
     public TaskExecutorLocalStateStoresManager(
             boolean localRecoveryEnabled,
+            boolean localBackupEnabled,
             @Nonnull Reference<File[]> localStateRootDirectories,
             @Nonnull Executor discardExecutor)
             throws IOException {
@@ -97,6 +101,7 @@ public class TaskExecutorLocalStateStoresManager {
 
         this.taskStateStoresByAllocationID = new HashMap<>();
         this.localRecoveryEnabled = localRecoveryEnabled;
+        this.localBackupEnabled = localBackupEnabled;
         this.localStateRootDirectories = localStateRootDirectories;
         this.discardExecutor = discardExecutor;
         this.lock = new Object();
@@ -157,17 +162,17 @@ public class TaskExecutorLocalStateStoresManager {
 
             if (taskLocalStateStore == null) {
 
-                LocalRecoveryDirectoryProviderImpl directoryProvider = null;
-                if (localRecoveryEnabled) {
+                LocalSnapshotDirectoryProviderImpl directoryProvider = null;
+                if (localRecoveryEnabled || localBackupEnabled) {
                     // create the allocation base dirs, one inside each root 
dir.
                     File[] allocationBaseDirectories = 
allocationBaseDirectories(allocationID);
                     directoryProvider =
-                            new LocalRecoveryDirectoryProviderImpl(
+                            new LocalSnapshotDirectoryProviderImpl(
                                     allocationBaseDirectories, jobId, 
jobVertexID, subtaskIndex);
                 }
-
                 LocalRecoveryConfig localRecoveryConfig =
-                        new LocalRecoveryConfig(directoryProvider);
+                        new LocalRecoveryConfig(
+                                localRecoveryEnabled, localBackupEnabled, 
directoryProvider);
 
                 boolean changelogEnabled =
                         jobConfiguration
@@ -176,7 +181,7 @@ public class TaskExecutorLocalStateStoresManager {
                                         clusterConfiguration.get(
                                                 
StateChangelogOptions.ENABLE_STATE_CHANGE_LOG));
 
-                if (localRecoveryConfig.isLocalRecoveryEnabled() && 
changelogEnabled) {
+                if (localRecoveryConfig.isLocalRecoveryOrLocalBackupEnabled() 
&& changelogEnabled) {
                     taskLocalStateStore =
                             new ChangelogTaskLocalStateStore(
                                     jobId,
@@ -185,7 +190,7 @@ public class TaskExecutorLocalStateStoresManager {
                                     subtaskIndex,
                                     localRecoveryConfig,
                                     discardExecutor);
-                } else if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+                } else if 
(localRecoveryConfig.isLocalRecoveryOrLocalBackupEnabled()) {
                     taskLocalStateStore =
                             new TaskLocalStateStoreImpl(
                                     jobId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index cf6a25545f8..a3c3b35fba6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -203,7 +203,7 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
         }
     }
 
-    protected LocalRecoveryDirectoryProvider 
getLocalRecoveryDirectoryProvider() {
+    protected LocalSnapshotDirectoryProvider 
getLocalRecoveryDirectoryProvider() {
         return localRecoveryConfig
                 .getLocalStateDirectoryProvider()
                 .orElseThrow(() -> new IllegalStateException("Local recovery 
must be enabled."));
@@ -219,6 +219,18 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
             snapshot = loadTaskStateSnapshot(checkpointID);
         }
 
+        // Even if local recovery is disabled, it is still necessary to load 
the TaskStateSnapshot
+        // so that it can be managed by the TaskLocalStateStore.
+        if (!localRecoveryConfig.isLocalRecoveryEnabled()) {
+            LOG.debug(
+                    "Local recovery is disabled for checkpoint {} in subtask 
({} - {} - {})",
+                    checkpointID,
+                    jobID,
+                    jobVertexID,
+                    subtaskIndex);
+            return null;
+        }
+
         if (snapshot != null) {
             LOG.info(
                     "Found registered local state for checkpoint {} in subtask 
({} - {} - {}) : {}",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index a403cfce5d2..69da532a6f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -119,7 +119,7 @@ class HeapSnapshotStrategy<K>
 
         final SupplierWithException<CheckpointStreamWithResultProvider, 
Exception>
                 checkpointStreamSupplier =
-                        localRecoveryConfig.isLocalRecoveryEnabled()
+                        localRecoveryConfig.isLocalBackupEnabled()
                                         && 
!checkpointOptions.getCheckpointType().isSavepoint()
                                 ? () ->
                                         createDuplicatingStream(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 5009ef733c6..6398107ea48 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -389,6 +389,7 @@ public class TaskManagerServices {
         final TaskExecutorLocalStateStoresManager taskStateManager =
                 new TaskExecutorLocalStateStoresManager(
                         
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
+                        
taskManagerServicesConfiguration.isLocalBackupEnabled(),
                         
taskManagerServicesConfiguration.getLocalRecoveryStateDirectories(),
                         ioExecutor);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index a6577c26b93..962d0e0605d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -83,6 +83,8 @@ public class TaskManagerServicesConfiguration {
 
     private final boolean localRecoveryEnabled;
 
+    private final boolean localBackupEnabled;
+
     private final RetryingRegistrationConfiguration 
retryingRegistrationConfiguration;
 
     private Optional<Time> systemResourceMetricsProbingInterval;
@@ -105,6 +107,7 @@ public class TaskManagerServicesConfiguration {
             String[] tmpDirPaths,
             Reference<File[]> localRecoveryStateDirectories,
             boolean localRecoveryEnabled,
+            boolean localBackupEnabled,
             @Nullable QueryableStateConfiguration queryableStateConfig,
             int numberOfSlots,
             int pageSize,
@@ -126,6 +129,7 @@ public class TaskManagerServicesConfiguration {
         this.tmpDirPaths = checkNotNull(tmpDirPaths);
         this.localRecoveryStateDirectories = 
checkNotNull(localRecoveryStateDirectories);
         this.localRecoveryEnabled = localRecoveryEnabled;
+        this.localBackupEnabled = localBackupEnabled;
         this.queryableStateConfig = queryableStateConfig;
         this.numberOfSlots = numberOfSlots;
 
@@ -188,6 +192,10 @@ public class TaskManagerServicesConfiguration {
         return localRecoveryEnabled;
     }
 
+    boolean isLocalBackupEnabled() {
+        return localBackupEnabled;
+    }
+
     @Nullable
     QueryableStateConfiguration getQueryableStateConfig() {
         return queryableStateConfig;
@@ -284,7 +292,8 @@ public class TaskManagerServicesConfiguration {
             localStateDirs = Reference.owned(createdLocalStateDirs);
         }
 
-        boolean localRecoveryMode = 
configuration.get(StateRecoveryOptions.LOCAL_RECOVERY);
+        boolean localRecoveryEnabled = 
configuration.get(StateRecoveryOptions.LOCAL_RECOVERY);
+        boolean localBackupEnabled = 
configuration.get(CheckpointingOptions.LOCAL_BACKUP_ENABLED);
 
         final QueryableStateConfiguration queryableStateConfig =
                 QueryableStateConfiguration.fromConfiguration(configuration);
@@ -327,7 +336,8 @@ public class TaskManagerServicesConfiguration {
                 localCommunicationOnly,
                 tmpDirs,
                 localStateDirs,
-                localRecoveryMode,
+                localRecoveryEnabled,
+                localBackupEnabled,
                 queryableStateConfig,
                 ConfigurationParserUtils.getSlot(configuration),
                 ConfigurationParserUtils.getPageSize(configuration),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
index 7833339e770..3ce11715331 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
@@ -44,7 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test for {@link ChangelogTaskLocalStateStore}. */
 class ChangelogTaskLocalStateStoreTest extends TaskLocalStateStoreImplTest {
 
-    private LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider;
+    private LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider;
 
     @BeforeEach
     @Override
@@ -62,12 +62,13 @@ class ChangelogTaskLocalStateStoreTest extends 
TaskLocalStateStoreImplTest {
             AllocationID allocationID,
             JobVertexID jobVertexID,
             int subtaskIdx) {
-        LocalRecoveryDirectoryProviderImpl directoryProvider =
-                new LocalRecoveryDirectoryProviderImpl(
+        LocalSnapshotDirectoryProviderImpl directoryProvider =
+                new LocalSnapshotDirectoryProviderImpl(
                         allocationBaseDirs, jobID, jobVertexID, subtaskIdx);
-        this.localRecoveryDirectoryProvider = directoryProvider;
+        this.localSnapshotDirectoryProvider = directoryProvider;
 
-        LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(directoryProvider);
+        LocalRecoveryConfig localRecoveryConfig =
+                
LocalRecoveryConfig.backupAndRecoveryEnabled(directoryProvider);
         return new ChangelogTaskLocalStateStore(
                 jobID,
                 allocationID,
@@ -173,14 +174,14 @@ class ChangelogTaskLocalStateStoreTest extends 
TaskLocalStateStoreImplTest {
 
     private boolean checkMaterializedDirExists(long materializationID) {
         File materializedDir =
-                
localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory(
+                
localSnapshotDirectoryProvider.subtaskSpecificCheckpointDirectory(
                         materializationID);
         return materializedDir.exists();
     }
 
     private void writeToMaterializedDir(long materializationID) {
         File materializedDir =
-                
localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory(
+                
localSnapshotDirectoryProvider.subtaskSpecificCheckpointDirectory(
                         materializationID);
         if (!materializedDir.exists() && !materializedDir.mkdirs()) {
             throw new FlinkRuntimeException(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
index a2f5a41d319..150de862b83 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
@@ -53,7 +53,7 @@ class CheckpointStreamWithResultProviderTest {
                     
.isInstanceOf(CheckpointStreamWithResultProvider.PrimaryStreamOnly.class);
         }
 
-        LocalRecoveryDirectoryProvider directoryProvider = 
createLocalRecoveryDirectoryProvider();
+        LocalSnapshotDirectoryProvider directoryProvider = 
createLocalRecoveryDirectoryProvider();
         try (CheckpointStreamWithResultProvider primaryAndSecondary =
                 CheckpointStreamWithResultProvider.createDuplicatingStream(
                         42L, CheckpointedStateScope.EXCLUSIVE, primaryFactory, 
directoryProvider)) {
@@ -87,7 +87,7 @@ class CheckpointStreamWithResultProviderTest {
     @Test
     void testCloseAndFinalizeCheckpointStreamResultPrimaryAndSecondary() 
throws Exception {
         CheckpointStreamFactory primaryFactory = 
createCheckpointStreamFactory();
-        LocalRecoveryDirectoryProvider directoryProvider = 
createLocalRecoveryDirectoryProvider();
+        LocalSnapshotDirectoryProvider directoryProvider = 
createLocalRecoveryDirectoryProvider();
 
         CheckpointStreamWithResultProvider resultProvider =
                 CheckpointStreamWithResultProvider.createDuplicatingStream(
@@ -201,13 +201,13 @@ class CheckpointStreamWithResultProviderTest {
         resultProvider.close();
     }
 
-    private LocalRecoveryDirectoryProvider 
createLocalRecoveryDirectoryProvider()
+    private LocalSnapshotDirectoryProvider 
createLocalRecoveryDirectoryProvider()
             throws IOException {
         File localStateDir = TempDirUtils.newFolder(temporaryFolder);
         JobID jobID = new JobID();
         JobVertexID jobVertexID = new JobVertexID();
         int subtaskIdx = 0;
-        return new LocalRecoveryDirectoryProviderImpl(
+        return new LocalSnapshotDirectoryProviderImpl(
                 localStateDir, jobID, jobVertexID, subtaskIdx);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImplTest.java
similarity index 93%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImplTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImplTest.java
index e617dd8335c..0b5572373f0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImplTest.java
@@ -33,8 +33,8 @@ import java.io.IOException;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link LocalRecoveryDirectoryProvider}. */
-class LocalRecoveryDirectoryProviderImplTest {
+/** Tests for {@link LocalSnapshotDirectoryProvider}. */
+class LocalSnapshotDirectoryProviderImplTest {
 
     private static final JobID JOB_ID = new JobID();
     private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
@@ -42,7 +42,7 @@ class LocalRecoveryDirectoryProviderImplTest {
 
     @TempDir private java.nio.file.Path tmpFolder;
 
-    private LocalRecoveryDirectoryProviderImpl directoryProvider;
+    private LocalSnapshotDirectoryProviderImpl directoryProvider;
     private File[] allocBaseFolders;
 
     @BeforeEach
@@ -54,7 +54,7 @@ class LocalRecoveryDirectoryProviderImplTest {
                     TempDirUtils.newFolder(tmpFolder)
                 };
         this.directoryProvider =
-                new LocalRecoveryDirectoryProviderImpl(
+                new LocalSnapshotDirectoryProviderImpl(
                         allocBaseFolders, JOB_ID, JOB_VERTEX_ID, 
SUBTASK_INDEX);
     }
 
@@ -123,7 +123,7 @@ class LocalRecoveryDirectoryProviderImplTest {
     void testPreconditionsNotNullFiles() {
         assertThatThrownBy(
                         () ->
-                                new LocalRecoveryDirectoryProviderImpl(
+                                new LocalSnapshotDirectoryProviderImpl(
                                         new File[] {null}, JOB_ID, 
JOB_VERTEX_ID, SUBTASK_INDEX))
                 .isInstanceOf(NullPointerException.class);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index fe5bdb739f3..31c2f406f30 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -220,7 +220,7 @@ class TaskExecutorLocalStateStoresManagerTest {
                         new Configuration(),
                         new Configuration());
 
-        LocalRecoveryDirectoryProvider directoryProvider =
+        LocalSnapshotDirectoryProvider directoryProvider =
                 taskLocalStateStore
                         .getLocalRecoveryConfig()
                         .getLocalStateDirectoryProvider()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
index 64b47ff55b9..7acf36f341b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
@@ -79,11 +79,12 @@ class TaskLocalStateStoreImplTest {
             AllocationID allocationID,
             JobVertexID jobVertexID,
             int subtaskIdx) {
-        LocalRecoveryDirectoryProviderImpl directoryProvider =
-                new LocalRecoveryDirectoryProviderImpl(
+        LocalSnapshotDirectoryProviderImpl directoryProvider =
+                new LocalSnapshotDirectoryProviderImpl(
                         allocationBaseDirs, jobID, jobVertexID, subtaskIdx);
 
-        LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(directoryProvider);
+        LocalRecoveryConfig localRecoveryConfig =
+                
LocalRecoveryConfig.backupAndRecoveryEnabled(directoryProvider);
         return new TaskLocalStateStoreImpl(
                 jobID,
                 allocationID,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index 76f78e26fda..3c8fe04a794 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -214,10 +214,11 @@ class TaskStateManagerImplTest {
                     TempDirUtils.newFolder(tmpFolder)
                 };
 
-        LocalRecoveryDirectoryProviderImpl directoryProvider =
-                new LocalRecoveryDirectoryProviderImpl(allocBaseDirs, jobID, 
jobVertexID, 0);
+        LocalSnapshotDirectoryProviderImpl directoryProvider =
+                new LocalSnapshotDirectoryProviderImpl(allocBaseDirs, jobID, 
jobVertexID, 0);
 
-        LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(directoryProvider);
+        LocalRecoveryConfig localRecoveryConfig =
+                
LocalRecoveryConfig.backupAndRecoveryEnabled(directoryProvider);
 
         TaskLocalStateStore taskLocalStateStore =
                 new TaskLocalStateStoreImpl(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
index a0782edd10f..85f0f4b9d79 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
@@ -24,14 +24,14 @@ import java.io.File;
 public class TestLocalRecoveryConfig {
 
     public static LocalRecoveryConfig disabled() {
-        return new LocalRecoveryConfig(null);
+        return LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED;
     }
 
     public static LocalRecoveryConfig enabledForTest() {
-        return new LocalRecoveryConfig(new TestDummyLocalDirectoryProvider());
+        return LocalRecoveryConfig.backupAndRecoveryEnabled(new 
TestDummyLocalDirectoryProvider());
     }
 
-    public static class TestDummyLocalDirectoryProvider implements 
LocalRecoveryDirectoryProvider {
+    public static class TestDummyLocalDirectoryProvider implements 
LocalSnapshotDirectoryProvider {
 
         private TestDummyLocalDirectoryProvider() {}
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
index f869f5cae63..bf50a1bb4ad 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
@@ -274,7 +274,7 @@ public class ChangelogStateDiscardTest {
                                 LatencyTrackingStateConfig.disabled(),
                                 emptyList(),
                                 
UncompressedStreamCompressionDecorator.INSTANCE,
-                                new LocalRecoveryConfig(null),
+                                
LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED,
                                 new HeapPriorityQueueSetFactory(
                                         kgRange, 
kgRange.getNumberOfKeyGroups(), 128),
                                 true,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
index 9cb73d01dc9..6c3948a6303 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResources;
@@ -184,9 +184,9 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
     protected SnapshotDirectory prepareLocalSnapshotDirectory(long 
checkpointId)
             throws IOException {
 
-        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+        if (localRecoveryConfig.isLocalBackupEnabled()) {
             // create a "permanent" snapshot directory for local recovery.
-            LocalRecoveryDirectoryProvider directoryProvider =
+            LocalSnapshotDirectoryProvider directoryProvider =
                     localRecoveryConfig
                             .getLocalStateDirectoryProvider()
                             
.orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
@@ -260,7 +260,7 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
             throws Exception {
 
         CheckpointStreamWithResultProvider streamWithResultProvider =
-                localRecoveryConfig.isLocalRecoveryEnabled()
+                localRecoveryConfig.isLocalBackupEnabled()
                         ? 
CheckpointStreamWithResultProvider.createDuplicatingStream(
                                 checkpointId,
                                 CheckpointedStateScope.EXCLUSIVE,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index 929cc3d2109..2ae3839cde3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
+import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.SnapshotResult;
@@ -220,11 +220,12 @@ public class LocalStateForwardingTest extends TestLogger {
 
         Executor executor = Executors.directExecutor();
 
-        LocalRecoveryDirectoryProviderImpl directoryProvider =
-                new LocalRecoveryDirectoryProviderImpl(
+        LocalSnapshotDirectoryProviderImpl directoryProvider =
+                new LocalSnapshotDirectoryProviderImpl(
                         temporaryFolder.newFolder(), jobID, jobVertexID, 
subtaskIdx);
 
-        LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(directoryProvider);
+        LocalRecoveryConfig localRecoveryConfig =
+                
LocalRecoveryConfig.backupAndRecoveryEnabled(directoryProvider);
 
         TaskLocalStateStore taskLocalStateStore =
                 new TaskLocalStateStoreImpl(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 63c30457856..0b8fe998736 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -46,7 +46,7 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
+import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -153,8 +153,8 @@ public class StreamTaskTestHarness<OUT> {
         this(
                 taskFactory,
                 outputType,
-                new LocalRecoveryConfig(
-                        new LocalRecoveryDirectoryProviderImpl(
+                LocalRecoveryConfig.backupAndRecoveryEnabled(
+                        new LocalSnapshotDirectoryProviderImpl(
                                 localRootDir, new JobID(), new JobVertexID(), 
0)));
     }
 
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
index 7d766858483..2f2e80dd07c 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
@@ -187,7 +187,7 @@ public class StateBackendBenchmarkUtils {
                         2,
                         new KeyGroupRange(0, 1),
                         executionConfig,
-                        new LocalRecoveryConfig(null),
+                        LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED,
                         RocksDBPriorityQueueConfig.buildWithPriorityQueueType(
                                 
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB),
                         ttlTimeProvider,
@@ -225,7 +225,7 @@ public class StateBackendBenchmarkUtils {
                         LatencyTrackingStateConfig.disabled(),
                         Collections.emptyList(),
                         
AbstractStateBackend.getCompressionDecorator(executionConfig),
-                        new LocalRecoveryConfig(null),
+                        LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED,
                         priorityQueueSetFactory,
                         false,
                         new CloseableRegistry());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
index 69b8ce40260..a2e0c7e5c71 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
@@ -40,8 +40,8 @@ import 
org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
+import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
+import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl;
 import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -177,13 +177,16 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
         JobVertexID jobVertexID = new JobVertexID();
         int subtaskIdx = 0;
 
-        LocalRecoveryDirectoryProvider directoryProvider =
+        LocalSnapshotDirectoryProvider directoryProvider =
                 mode == ONLY_JM_RECOVERY
                         ? null
-                        : new LocalRecoveryDirectoryProviderImpl(
+                        : new LocalSnapshotDirectoryProviderImpl(
                                 temporaryFolder.newFolder(), jobID, 
jobVertexID, subtaskIdx);
 
-        LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(directoryProvider);
+        LocalRecoveryConfig localRecoveryConfig =
+                (directoryProvider == null)
+                        ? LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED
+                        : 
LocalRecoveryConfig.backupAndRecoveryEnabled(directoryProvider);
 
         MockEnvironment mockEnvironment =
                 new MockEnvironmentBuilder()

Reply via email to