This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ed75795e97800177cb67141ab838632d5ec55bb5 Author: Jinzhong Li <lijinzhong2...@gmail.com> AuthorDate: Wed Feb 28 12:19:36 2024 +0800 [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery --- .../generated/checkpointing_configuration.html | 6 ++++ .../generated/common_state_backends_section.html | 6 ++++ .../generated/state_recovery_configuration.html | 2 +- .../flink/configuration/CheckpointingOptions.java | 24 +++++++++++++- .../flink/configuration/StateRecoveryOptions.java | 16 +++++---- .../fs/DuplicatingStateChangeFsUploader.java | 10 +++--- .../changelog/fs/FsStateChangelogStorage.java | 2 +- .../flink/changelog/fs/FsStateChangelogWriter.java | 2 +- .../changelog/fs/StateChangeUploadScheduler.java | 2 +- .../api/runtime/SavepointTaskStateManager.java | 2 +- .../DefaultSlotPoolServiceSchedulerFactory.java | 1 - .../state/ChangelogTaskLocalStateStore.java | 2 +- .../state/CheckpointStreamWithResultProvider.java | 2 +- .../flink/runtime/state/LocalRecoveryConfig.java | 38 +++++++++++++++++++--- ...er.java => LocalSnapshotDirectoryProvider.java} | 14 ++++---- ...ava => LocalSnapshotDirectoryProviderImpl.java} | 10 +++--- .../state/TaskExecutorLocalStateStoresManager.java | 21 +++++++----- .../runtime/state/TaskLocalStateStoreImpl.java | 14 +++++++- .../runtime/state/heap/HeapSnapshotStrategy.java | 2 +- .../runtime/taskexecutor/TaskManagerServices.java | 1 + .../TaskManagerServicesConfiguration.java | 14 ++++++-- .../state/ChangelogTaskLocalStateStoreTest.java | 15 +++++---- .../CheckpointStreamWithResultProviderTest.java | 8 ++--- ...=> LocalSnapshotDirectoryProviderImplTest.java} | 10 +++--- .../TaskExecutorLocalStateStoresManagerTest.java | 2 +- .../runtime/state/TaskLocalStateStoreImplTest.java | 7 ++-- .../runtime/state/TaskStateManagerImplTest.java | 7 ++-- .../runtime/state/TestLocalRecoveryConfig.java | 6 ++-- .../state/changelog/ChangelogStateDiscardTest.java | 2 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 8 ++--- .../runtime/tasks/LocalStateForwardingTest.java | 9 ++--- .../runtime/tasks/StreamTaskTestHarness.java | 6 ++-- .../benchmark/StateBackendBenchmarkUtils.java | 4 +-- .../restore/StreamOperatorSnapshotRestoreTest.java | 13 +++++--- 34 files changed, 194 insertions(+), 94 deletions(-) diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html b/docs/layouts/shortcodes/generated/checkpointing_configuration.html index 0cd3dd53755..c87a9c33803 100644 --- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html @@ -8,6 +8,12 @@ </tr> </thead> <tbody> + <tr> + <td><h5>execution.checkpointing.local-backup.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>This option configures local backup for the state backend, which indicates whether to make backup checkpoint on local disk. If not configured, fallback to execution.state-recovery.from-local. By default, local backup is deactivated. Local backup currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).</td> + </tr> <tr> <td><h5>state.backend.incremental</h5></td> <td style="word-wrap: break-word;">false</td> diff --git a/docs/layouts/shortcodes/generated/common_state_backends_section.html b/docs/layouts/shortcodes/generated/common_state_backends_section.html index a16650bf412..ab664f51bfb 100644 --- a/docs/layouts/shortcodes/generated/common_state_backends_section.html +++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html @@ -32,6 +32,12 @@ <td>String</td> <td>The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).</td> </tr> + <tr> + <td><h5>execution.state-recovery.from-local</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>This option configures local recovery for the state backend, which indicates whether to recovery from local snapshot.By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."</td> + </tr> <tr> <td><h5>state.backend.incremental</h5></td> <td style="word-wrap: break-word;">false</td> diff --git a/docs/layouts/shortcodes/generated/state_recovery_configuration.html b/docs/layouts/shortcodes/generated/state_recovery_configuration.html index 2a1d914e2fd..2df071ea4d5 100644 --- a/docs/layouts/shortcodes/generated/state_recovery_configuration.html +++ b/docs/layouts/shortcodes/generated/state_recovery_configuration.html @@ -18,7 +18,7 @@ <td><h5>execution.state-recovery.from-local</h5></td> <td style="word-wrap: break-word;">false</td> <td>Boolean</td> - <td> This option configures local recovery for the state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."</td> + <td>This option configures local recovery for the state backend, which indicates whether to recovery from local snapshot.By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."</td> </tr> <tr> <td><h5>execution.state-recovery.ignore-unclaimed-state</h5></td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java index a5a00e813a0..5626da3d467 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java @@ -179,7 +179,8 @@ public class CheckpointingOptions { * <p>Local recovery currently only covers keyed state backends (including both the * EmbeddedRocksDBStateBackend and the HashMapStateBackend). * - * @Deprecated use {@link StateRecoveryOptions#LOCAL_RECOVERY} instead. + * @deprecated use {@link StateRecoveryOptions#LOCAL_RECOVERY} and {@link + * CheckpointingOptions#LOCAL_BACKUP_ENABLED} instead. */ @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) @Documentation.ExcludeFromDocumentation("Hidden for deprecated") @@ -309,4 +310,25 @@ public class CheckpointingOptions { + "The actual write buffer size is determined to be the maximum of the value of this option and option '%s'.", FS_SMALL_FILE_THRESHOLD.key())) .withDeprecatedKeys("state.backend.fs.write-buffer-size"); + + /** + * This option configures local backup for the state backend, which indicates whether to make + * backup checkpoint on local disk. If not configured, fallback to {@link + * StateRecoveryOptions#LOCAL_RECOVERY}. By default, local backup is deactivated. Local backup + * currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend + * and the HashMapStateBackend). + */ + public static final ConfigOption<Boolean> LOCAL_BACKUP_ENABLED = + ConfigOptions.key("execution.checkpointing.local-backup.enabled") + .booleanType() + .defaultValue(StateRecoveryOptions.LOCAL_RECOVERY.defaultValue()) + .withFallbackKeys(StateRecoveryOptions.LOCAL_RECOVERY.key()) + .withDeprecatedKeys(LOCAL_RECOVERY.key()) + .withDescription( + "This option configures local backup for the state backend, " + + "which indicates whether to make backup checkpoint on local disk. " + + "If not configured, fallback to " + + StateRecoveryOptions.LOCAL_RECOVERY.key() + + ". By default, local backup is deactivated. Local backup currently only " + + "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."); } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java index cfd585b89f6..c19329c5d92 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java @@ -99,19 +99,21 @@ public class StateRecoveryOptions { .build()); /** - * This option configures local recovery for the state backend. By default, local recovery is - * deactivated. + * This option configures local recovery for the state backend, which indicates whether to + * recovery from local snapshot. By default, local recovery is deactivated. * * <p>Local recovery currently only covers keyed state backends (including both the * EmbeddedRocksDBStateBackend and the HashMapStateBackend). - * */ + @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) public static final ConfigOption<Boolean> LOCAL_RECOVERY = ConfigOptions.key("execution.state-recovery.from-local") .booleanType() .defaultValue(false) - .withDeprecatedKeys(CheckpointingOptions.LOCAL_RECOVERY.key()) - .withDescription(" This option configures local recovery for the state backend. " - + "By default, local recovery is deactivated. Local recovery currently only " - + "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).\""); + .withDeprecatedKeys("state.backend.local-recovery") + .withDescription( + "This option configures local recovery for the state backend, " + + "which indicates whether to recovery from local snapshot." + + "By default, local recovery is deactivated. Local recovery currently only " + + "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).\""); } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java index 50db749b973..babe8fa8d28 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java @@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore; -import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; +import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry; import org.apache.flink.runtime.state.filesystem.FileStateHandle; @@ -68,7 +68,7 @@ public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploa private final Path basePath; private final FileSystem fileSystem; - private final LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider; + private final LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider; private final JobID jobID; public DuplicatingStateChangeFsUploader( @@ -79,12 +79,12 @@ public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploa int bufferSize, ChangelogStorageMetricGroup metrics, TaskChangelogRegistry changelogRegistry, - LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider) { + LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider) { super(compression, bufferSize, metrics, changelogRegistry, FileStateHandle::new); this.basePath = new Path(basePath, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR)); this.fileSystem = fileSystem; - this.localRecoveryDirectoryProvider = localRecoveryDirectoryProvider; + this.localSnapshotDirectoryProvider = localSnapshotDirectoryProvider; this.jobID = jobID; } @@ -96,7 +96,7 @@ public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploa FSDataOutputStream primaryStream = fileSystem.create(path, WriteMode.NO_OVERWRITE); Path localPath = new Path( - getLocalTaskOwnedDirectory(localRecoveryDirectoryProvider, jobID), + getLocalTaskOwnedDirectory(localSnapshotDirectoryProvider, jobID), fileName); FSDataOutputStream secondaryStream = localPath.getFileSystem().create(localPath, WriteMode.NO_OVERWRITE); diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java index 1d4668cb8e8..66d6409c806 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java @@ -143,7 +143,7 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery this.changelogRegistry = changelogRegistry; this.uploader = uploader; this.localRecoveryConfig = localRecoveryConfig; - if (localRecoveryConfig.isLocalRecoveryEnabled()) { + if (localRecoveryConfig.isLocalBackupEnabled()) { this.localChangelogRegistry = new LocalChangelogRegistryImpl(Executors.newSingleThreadExecutor()); } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java index f0a4b21646e..9f0c74964b2 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java @@ -421,7 +421,7 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl size, incrementalSize, FsStateChangelogStorageFactory.IDENTIFIER); - if (localRecoveryConfig.isLocalRecoveryEnabled()) { + if (localRecoveryConfig.isLocalBackupEnabled()) { size = 0; List<Tuple2<StreamStateHandle, Long>> localTuples = new ArrayList<>(); for (UploadResult uploadResult : results.values()) { diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java index 19f09a151e6..45386f842b2 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java @@ -97,7 +97,7 @@ public interface StateChangeUploadScheduler extends AutoCloseable { checkArgument(bytes <= Integer.MAX_VALUE); int bufferSize = (int) bytes; StateChangeUploader store = - localRecoveryConfig.isLocalRecoveryEnabled() + localRecoveryConfig.isLocalBackupEnabled() ? new DuplicatingStateChangeFsUploader( jobID, basePath, diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java index 6ce30d61243..1fb84df416d 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java @@ -89,7 +89,7 @@ final class SavepointTaskStateManager implements TaskStateManager { @Nonnull @Override public LocalRecoveryConfig createLocalRecoveryConfig() { - return new LocalRecoveryConfig(null); + return LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index 8e85563fdf9..1472f247a10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RpcOptions; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java index 23b2b6fd42a..7be6d9ead80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java @@ -101,7 +101,7 @@ public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl { } public static Path getLocalTaskOwnedDirectory( - LocalRecoveryDirectoryProvider provider, JobID jobID) { + LocalSnapshotDirectoryProvider provider, JobID jobID) { File outDir = provider.selectAllocationBaseDirectory( (jobID.hashCode() & Integer.MAX_VALUE) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java index 3378ccb7713..a4bc3c27f54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java @@ -164,7 +164,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable { @Nonnegative long checkpointId, @Nonnull CheckpointedStateScope checkpointedStateScope, @Nonnull CheckpointStreamFactory primaryStreamFactory, - @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) + @Nonnull LocalSnapshotDirectoryProvider secondaryStreamDirProvider) throws IOException { CheckpointStateOutputStream primaryOut = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java index 5a378b60ae2..e4b833540d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.state; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Optional; @@ -30,18 +33,40 @@ import java.util.function.Supplier; */ public class LocalRecoveryConfig { + public static final LocalRecoveryConfig BACKUP_AND_RECOVERY_DISABLED = + new LocalRecoveryConfig(false, false, null); + + /** Whether to recover from the local snapshot. */ + private final boolean localRecoveryEnabled; + + /** Whether to do backup checkpoint on local disk. */ + private final boolean localBackupEnabled; + /** Encapsulates the root directories and the subtask-specific path. */ - @Nullable private final LocalRecoveryDirectoryProvider localStateDirectories; + @Nullable private final LocalSnapshotDirectoryProvider localStateDirectories; - public LocalRecoveryConfig(@Nullable LocalRecoveryDirectoryProvider directoryProvider) { + public LocalRecoveryConfig( + boolean localRecoveryEnabled, + boolean localBackupEnabled, + @Nullable LocalSnapshotDirectoryProvider directoryProvider) { + this.localRecoveryEnabled = localRecoveryEnabled; + this.localBackupEnabled = localBackupEnabled; this.localStateDirectories = directoryProvider; } public boolean isLocalRecoveryEnabled() { - return localStateDirectories != null; + return localRecoveryEnabled; + } + + public boolean isLocalBackupEnabled() { + return localBackupEnabled; + } + + public boolean isLocalRecoveryOrLocalBackupEnabled() { + return localRecoveryEnabled || localBackupEnabled; } - public Optional<LocalRecoveryDirectoryProvider> getLocalStateDirectoryProvider() { + public Optional<LocalSnapshotDirectoryProvider> getLocalStateDirectoryProvider() { return Optional.ofNullable(localStateDirectories); } @@ -55,4 +80,9 @@ public class LocalRecoveryConfig { new IllegalStateException( "Getting a LocalRecoveryDirectoryProvider is only supported with the local recovery enabled. This is a bug and should be reported."); } + + public static LocalRecoveryConfig backupAndRecoveryEnabled( + @Nonnull LocalSnapshotDirectoryProvider directoryProvider) { + return new LocalRecoveryConfig(true, true, Preconditions.checkNotNull(directoryProvider)); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProvider.java similarity index 83% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProvider.java index 85d2ab00161..0ab95bcf00b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProvider.java @@ -22,12 +22,12 @@ import java.io.File; import java.io.Serializable; /** - * Provides directories for local recovery. It offers access to the allocation base directories - * (i.e. the root directories for all local state that is created under the same allocation id) and - * the subtask-specific paths, which contain the local state for one subtask. Access by checkpoint - * id rotates over all root directory indexes, in case that there is more than one. Selection - * methods are provided to pick the directory under a certain index. Directory structures are of the - * following shape: + * Provides directories for local backup or local recovery. It offers access to the allocation base + * directories (i.e. the root directories for all local state that is created under the same + * allocation id) and the subtask-specific paths, which contain the local state for one subtask. + * Access by checkpoint id rotates over all root directory indexes, in case that there is more than + * one. Selection methods are provided to pick the directory under a certain index. Directory + * structures are of the following shape: * * <p> * @@ -48,7 +48,7 @@ import java.io.Serializable; * * <p> */ -public interface LocalRecoveryDirectoryProvider extends Serializable { +public interface LocalSnapshotDirectoryProvider extends Serializable { /** * Returns the local state allocation base directory for given checkpoint id w.r.t. our rotation diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImpl.java similarity index 93% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImpl.java index c2ee005806f..128e16758f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImpl.java @@ -33,15 +33,15 @@ import java.io.File; import java.nio.file.Paths; import java.util.Arrays; -/** Implementation of {@link LocalRecoveryDirectoryProvider}. */ -public class LocalRecoveryDirectoryProviderImpl implements LocalRecoveryDirectoryProvider { +/** Implementation of {@link LocalSnapshotDirectoryProvider}. */ +public class LocalSnapshotDirectoryProviderImpl implements LocalSnapshotDirectoryProvider { /** Serial version. */ private static final long serialVersionUID = 1L; /** Logger for this class. */ private static final Logger LOG = - LoggerFactory.getLogger(LocalRecoveryDirectoryProviderImpl.class); + LoggerFactory.getLogger(LocalSnapshotDirectoryProviderImpl.class); /** All available root directories that this can potentially deliver. */ @Nonnull private final File[] allocationBaseDirs; @@ -55,7 +55,7 @@ public class LocalRecoveryDirectoryProviderImpl implements LocalRecoveryDirector /** Index of the owning subtask. */ @Nonnegative private final int subtaskIndex; - public LocalRecoveryDirectoryProviderImpl( + public LocalSnapshotDirectoryProviderImpl( File allocationBaseDir, @Nonnull JobID jobID, @Nonnull JobVertexID jobVertexID, @@ -63,7 +63,7 @@ public class LocalRecoveryDirectoryProviderImpl implements LocalRecoveryDirector this(new File[] {allocationBaseDir}, jobID, jobVertexID, subtaskIndex); } - public LocalRecoveryDirectoryProviderImpl( + public LocalSnapshotDirectoryProviderImpl( @Nonnull File[] allocationBaseDirs, @Nonnull JobID jobID, @Nonnull JobVertexID jobVertexID, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index b58b7fbde93..efb0f54f4ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -67,9 +67,12 @@ public class TaskExecutorLocalStateStoresManager { private final Map<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> taskStateStoresByAllocationID; - /** The configured mode for local recovery on this task manager. */ + /** Whether to recover from the local snapshot. */ private final boolean localRecoveryEnabled; + /** Whether to do backup checkpoint on local disk. */ + private final boolean localBackupEnabled; + /** This is the root directory for all local state of this task manager / executor. */ private final Reference<File[]> localStateRootDirectories; @@ -86,6 +89,7 @@ public class TaskExecutorLocalStateStoresManager { public TaskExecutorLocalStateStoresManager( boolean localRecoveryEnabled, + boolean localBackupEnabled, @Nonnull Reference<File[]> localStateRootDirectories, @Nonnull Executor discardExecutor) throws IOException { @@ -97,6 +101,7 @@ public class TaskExecutorLocalStateStoresManager { this.taskStateStoresByAllocationID = new HashMap<>(); this.localRecoveryEnabled = localRecoveryEnabled; + this.localBackupEnabled = localBackupEnabled; this.localStateRootDirectories = localStateRootDirectories; this.discardExecutor = discardExecutor; this.lock = new Object(); @@ -157,17 +162,17 @@ public class TaskExecutorLocalStateStoresManager { if (taskLocalStateStore == null) { - LocalRecoveryDirectoryProviderImpl directoryProvider = null; - if (localRecoveryEnabled) { + LocalSnapshotDirectoryProviderImpl directoryProvider = null; + if (localRecoveryEnabled || localBackupEnabled) { // create the allocation base dirs, one inside each root dir. File[] allocationBaseDirectories = allocationBaseDirectories(allocationID); directoryProvider = - new LocalRecoveryDirectoryProviderImpl( + new LocalSnapshotDirectoryProviderImpl( allocationBaseDirectories, jobId, jobVertexID, subtaskIndex); } - LocalRecoveryConfig localRecoveryConfig = - new LocalRecoveryConfig(directoryProvider); + new LocalRecoveryConfig( + localRecoveryEnabled, localBackupEnabled, directoryProvider); boolean changelogEnabled = jobConfiguration @@ -176,7 +181,7 @@ public class TaskExecutorLocalStateStoresManager { clusterConfiguration.get( StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)); - if (localRecoveryConfig.isLocalRecoveryEnabled() && changelogEnabled) { + if (localRecoveryConfig.isLocalRecoveryOrLocalBackupEnabled() && changelogEnabled) { taskLocalStateStore = new ChangelogTaskLocalStateStore( jobId, @@ -185,7 +190,7 @@ public class TaskExecutorLocalStateStoresManager { subtaskIndex, localRecoveryConfig, discardExecutor); - } else if (localRecoveryConfig.isLocalRecoveryEnabled()) { + } else if (localRecoveryConfig.isLocalRecoveryOrLocalBackupEnabled()) { taskLocalStateStore = new TaskLocalStateStoreImpl( jobId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java index cf6a25545f8..a3c3b35fba6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -203,7 +203,7 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { } } - protected LocalRecoveryDirectoryProvider getLocalRecoveryDirectoryProvider() { + protected LocalSnapshotDirectoryProvider getLocalRecoveryDirectoryProvider() { return localRecoveryConfig .getLocalStateDirectoryProvider() .orElseThrow(() -> new IllegalStateException("Local recovery must be enabled.")); @@ -219,6 +219,18 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { snapshot = loadTaskStateSnapshot(checkpointID); } + // Even if local recovery is disabled, it is still necessary to load the TaskStateSnapshot + // so that it can be managed by the TaskLocalStateStore. + if (!localRecoveryConfig.isLocalRecoveryEnabled()) { + LOG.debug( + "Local recovery is disabled for checkpoint {} in subtask ({} - {} - {})", + checkpointID, + jobID, + jobVertexID, + subtaskIndex); + return null; + } + if (snapshot != null) { LOG.info( "Found registered local state for checkpoint {} in subtask ({} - {} - {}) : {}", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java index a403cfce5d2..69da532a6f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java @@ -119,7 +119,7 @@ class HeapSnapshotStrategy<K> final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier = - localRecoveryConfig.isLocalRecoveryEnabled() + localRecoveryConfig.isLocalBackupEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ? () -> createDuplicatingStream( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 5009ef733c6..6398107ea48 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -389,6 +389,7 @@ public class TaskManagerServices { final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( taskManagerServicesConfiguration.isLocalRecoveryEnabled(), + taskManagerServicesConfiguration.isLocalBackupEnabled(), taskManagerServicesConfiguration.getLocalRecoveryStateDirectories(), ioExecutor); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index a6577c26b93..962d0e0605d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -83,6 +83,8 @@ public class TaskManagerServicesConfiguration { private final boolean localRecoveryEnabled; + private final boolean localBackupEnabled; + private final RetryingRegistrationConfiguration retryingRegistrationConfiguration; private Optional<Time> systemResourceMetricsProbingInterval; @@ -105,6 +107,7 @@ public class TaskManagerServicesConfiguration { String[] tmpDirPaths, Reference<File[]> localRecoveryStateDirectories, boolean localRecoveryEnabled, + boolean localBackupEnabled, @Nullable QueryableStateConfiguration queryableStateConfig, int numberOfSlots, int pageSize, @@ -126,6 +129,7 @@ public class TaskManagerServicesConfiguration { this.tmpDirPaths = checkNotNull(tmpDirPaths); this.localRecoveryStateDirectories = checkNotNull(localRecoveryStateDirectories); this.localRecoveryEnabled = localRecoveryEnabled; + this.localBackupEnabled = localBackupEnabled; this.queryableStateConfig = queryableStateConfig; this.numberOfSlots = numberOfSlots; @@ -188,6 +192,10 @@ public class TaskManagerServicesConfiguration { return localRecoveryEnabled; } + boolean isLocalBackupEnabled() { + return localBackupEnabled; + } + @Nullable QueryableStateConfiguration getQueryableStateConfig() { return queryableStateConfig; @@ -284,7 +292,8 @@ public class TaskManagerServicesConfiguration { localStateDirs = Reference.owned(createdLocalStateDirs); } - boolean localRecoveryMode = configuration.get(StateRecoveryOptions.LOCAL_RECOVERY); + boolean localRecoveryEnabled = configuration.get(StateRecoveryOptions.LOCAL_RECOVERY); + boolean localBackupEnabled = configuration.get(CheckpointingOptions.LOCAL_BACKUP_ENABLED); final QueryableStateConfiguration queryableStateConfig = QueryableStateConfiguration.fromConfiguration(configuration); @@ -327,7 +336,8 @@ public class TaskManagerServicesConfiguration { localCommunicationOnly, tmpDirs, localStateDirs, - localRecoveryMode, + localRecoveryEnabled, + localBackupEnabled, queryableStateConfig, ConfigurationParserUtils.getSlot(configuration), ConfigurationParserUtils.getPageSize(configuration), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java index 7833339e770..3ce11715331 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java @@ -44,7 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link ChangelogTaskLocalStateStore}. */ class ChangelogTaskLocalStateStoreTest extends TaskLocalStateStoreImplTest { - private LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider; + private LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider; @BeforeEach @Override @@ -62,12 +62,13 @@ class ChangelogTaskLocalStateStoreTest extends TaskLocalStateStoreImplTest { AllocationID allocationID, JobVertexID jobVertexID, int subtaskIdx) { - LocalRecoveryDirectoryProviderImpl directoryProvider = - new LocalRecoveryDirectoryProviderImpl( + LocalSnapshotDirectoryProviderImpl directoryProvider = + new LocalSnapshotDirectoryProviderImpl( allocationBaseDirs, jobID, jobVertexID, subtaskIdx); - this.localRecoveryDirectoryProvider = directoryProvider; + this.localSnapshotDirectoryProvider = directoryProvider; - LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(directoryProvider); + LocalRecoveryConfig localRecoveryConfig = + LocalRecoveryConfig.backupAndRecoveryEnabled(directoryProvider); return new ChangelogTaskLocalStateStore( jobID, allocationID, @@ -173,14 +174,14 @@ class ChangelogTaskLocalStateStoreTest extends TaskLocalStateStoreImplTest { private boolean checkMaterializedDirExists(long materializationID) { File materializedDir = - localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory( + localSnapshotDirectoryProvider.subtaskSpecificCheckpointDirectory( materializationID); return materializedDir.exists(); } private void writeToMaterializedDir(long materializationID) { File materializedDir = - localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory( + localSnapshotDirectoryProvider.subtaskSpecificCheckpointDirectory( materializationID); if (!materializedDir.exists() && !materializedDir.mkdirs()) { throw new FlinkRuntimeException( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java index a2f5a41d319..150de862b83 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java @@ -53,7 +53,7 @@ class CheckpointStreamWithResultProviderTest { .isInstanceOf(CheckpointStreamWithResultProvider.PrimaryStreamOnly.class); } - LocalRecoveryDirectoryProvider directoryProvider = createLocalRecoveryDirectoryProvider(); + LocalSnapshotDirectoryProvider directoryProvider = createLocalRecoveryDirectoryProvider(); try (CheckpointStreamWithResultProvider primaryAndSecondary = CheckpointStreamWithResultProvider.createDuplicatingStream( 42L, CheckpointedStateScope.EXCLUSIVE, primaryFactory, directoryProvider)) { @@ -87,7 +87,7 @@ class CheckpointStreamWithResultProviderTest { @Test void testCloseAndFinalizeCheckpointStreamResultPrimaryAndSecondary() throws Exception { CheckpointStreamFactory primaryFactory = createCheckpointStreamFactory(); - LocalRecoveryDirectoryProvider directoryProvider = createLocalRecoveryDirectoryProvider(); + LocalSnapshotDirectoryProvider directoryProvider = createLocalRecoveryDirectoryProvider(); CheckpointStreamWithResultProvider resultProvider = CheckpointStreamWithResultProvider.createDuplicatingStream( @@ -201,13 +201,13 @@ class CheckpointStreamWithResultProviderTest { resultProvider.close(); } - private LocalRecoveryDirectoryProvider createLocalRecoveryDirectoryProvider() + private LocalSnapshotDirectoryProvider createLocalRecoveryDirectoryProvider() throws IOException { File localStateDir = TempDirUtils.newFolder(temporaryFolder); JobID jobID = new JobID(); JobVertexID jobVertexID = new JobVertexID(); int subtaskIdx = 0; - return new LocalRecoveryDirectoryProviderImpl( + return new LocalSnapshotDirectoryProviderImpl( localStateDir, jobID, jobVertexID, subtaskIdx); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImplTest.java similarity index 93% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImplTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImplTest.java index e617dd8335c..0b5572373f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalSnapshotDirectoryProviderImplTest.java @@ -33,8 +33,8 @@ import java.io.IOException; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Tests for {@link LocalRecoveryDirectoryProvider}. */ -class LocalRecoveryDirectoryProviderImplTest { +/** Tests for {@link LocalSnapshotDirectoryProvider}. */ +class LocalSnapshotDirectoryProviderImplTest { private static final JobID JOB_ID = new JobID(); private static final JobVertexID JOB_VERTEX_ID = new JobVertexID(); @@ -42,7 +42,7 @@ class LocalRecoveryDirectoryProviderImplTest { @TempDir private java.nio.file.Path tmpFolder; - private LocalRecoveryDirectoryProviderImpl directoryProvider; + private LocalSnapshotDirectoryProviderImpl directoryProvider; private File[] allocBaseFolders; @BeforeEach @@ -54,7 +54,7 @@ class LocalRecoveryDirectoryProviderImplTest { TempDirUtils.newFolder(tmpFolder) }; this.directoryProvider = - new LocalRecoveryDirectoryProviderImpl( + new LocalSnapshotDirectoryProviderImpl( allocBaseFolders, JOB_ID, JOB_VERTEX_ID, SUBTASK_INDEX); } @@ -123,7 +123,7 @@ class LocalRecoveryDirectoryProviderImplTest { void testPreconditionsNotNullFiles() { assertThatThrownBy( () -> - new LocalRecoveryDirectoryProviderImpl( + new LocalSnapshotDirectoryProviderImpl( new File[] {null}, JOB_ID, JOB_VERTEX_ID, SUBTASK_INDEX)) .isInstanceOf(NullPointerException.class); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index fe5bdb739f3..31c2f406f30 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -220,7 +220,7 @@ class TaskExecutorLocalStateStoresManagerTest { new Configuration(), new Configuration()); - LocalRecoveryDirectoryProvider directoryProvider = + LocalSnapshotDirectoryProvider directoryProvider = taskLocalStateStore .getLocalRecoveryConfig() .getLocalStateDirectoryProvider() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java index 64b47ff55b9..7acf36f341b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java @@ -79,11 +79,12 @@ class TaskLocalStateStoreImplTest { AllocationID allocationID, JobVertexID jobVertexID, int subtaskIdx) { - LocalRecoveryDirectoryProviderImpl directoryProvider = - new LocalRecoveryDirectoryProviderImpl( + LocalSnapshotDirectoryProviderImpl directoryProvider = + new LocalSnapshotDirectoryProviderImpl( allocationBaseDirs, jobID, jobVertexID, subtaskIdx); - LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(directoryProvider); + LocalRecoveryConfig localRecoveryConfig = + LocalRecoveryConfig.backupAndRecoveryEnabled(directoryProvider); return new TaskLocalStateStoreImpl( jobID, allocationID, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java index 76f78e26fda..3c8fe04a794 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java @@ -214,10 +214,11 @@ class TaskStateManagerImplTest { TempDirUtils.newFolder(tmpFolder) }; - LocalRecoveryDirectoryProviderImpl directoryProvider = - new LocalRecoveryDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0); + LocalSnapshotDirectoryProviderImpl directoryProvider = + new LocalSnapshotDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0); - LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(directoryProvider); + LocalRecoveryConfig localRecoveryConfig = + LocalRecoveryConfig.backupAndRecoveryEnabled(directoryProvider); TaskLocalStateStore taskLocalStateStore = new TaskLocalStateStoreImpl( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java index a0782edd10f..85f0f4b9d79 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java @@ -24,14 +24,14 @@ import java.io.File; public class TestLocalRecoveryConfig { public static LocalRecoveryConfig disabled() { - return new LocalRecoveryConfig(null); + return LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED; } public static LocalRecoveryConfig enabledForTest() { - return new LocalRecoveryConfig(new TestDummyLocalDirectoryProvider()); + return LocalRecoveryConfig.backupAndRecoveryEnabled(new TestDummyLocalDirectoryProvider()); } - public static class TestDummyLocalDirectoryProvider implements LocalRecoveryDirectoryProvider { + public static class TestDummyLocalDirectoryProvider implements LocalSnapshotDirectoryProvider { private TestDummyLocalDirectoryProvider() {} diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java index f869f5cae63..bf50a1bb4ad 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java @@ -274,7 +274,7 @@ public class ChangelogStateDiscardTest { LatencyTrackingStateConfig.disabled(), emptyList(), UncompressedStreamCompressionDecorator.INSTANCE, - new LocalRecoveryConfig(null), + LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED, new HeapPriorityQueueSetFactory( kgRange, kgRange.getNumberOfKeyGroups(), 128), true, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java index 9cb73d01dc9..6c3948a6303 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; -import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; +import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.SnapshotDirectory; import org.apache.flink.runtime.state.SnapshotResources; @@ -184,9 +184,9 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources protected SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId) throws IOException { - if (localRecoveryConfig.isLocalRecoveryEnabled()) { + if (localRecoveryConfig.isLocalBackupEnabled()) { // create a "permanent" snapshot directory for local recovery. - LocalRecoveryDirectoryProvider directoryProvider = + LocalSnapshotDirectoryProvider directoryProvider = localRecoveryConfig .getLocalStateDirectoryProvider() .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()); @@ -260,7 +260,7 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources throws Exception { CheckpointStreamWithResultProvider streamWithResultProvider = - localRecoveryConfig.isLocalRecoveryEnabled() + localRecoveryConfig.isLocalBackupEnabled() ? CheckpointStreamWithResultProvider.createDuplicatingStream( checkpointId, CheckpointedStateScope.EXCLUSIVE, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java index 929cc3d2109..2ae3839cde3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java @@ -36,7 +36,7 @@ import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.InputChannelStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; -import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl; +import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; import org.apache.flink.runtime.state.SnapshotResult; @@ -220,11 +220,12 @@ public class LocalStateForwardingTest extends TestLogger { Executor executor = Executors.directExecutor(); - LocalRecoveryDirectoryProviderImpl directoryProvider = - new LocalRecoveryDirectoryProviderImpl( + LocalSnapshotDirectoryProviderImpl directoryProvider = + new LocalSnapshotDirectoryProviderImpl( temporaryFolder.newFolder(), jobID, jobVertexID, subtaskIdx); - LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(directoryProvider); + LocalRecoveryConfig localRecoveryConfig = + LocalRecoveryConfig.backupAndRecoveryEnabled(directoryProvider); TaskLocalStateStore taskLocalStateStore = new TaskLocalStateStoreImpl( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 63c30457856..0b8fe998736 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -46,7 +46,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.LocalRecoveryConfig; -import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl; +import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -153,8 +153,8 @@ public class StreamTaskTestHarness<OUT> { this( taskFactory, outputType, - new LocalRecoveryConfig( - new LocalRecoveryDirectoryProviderImpl( + LocalRecoveryConfig.backupAndRecoveryEnabled( + new LocalSnapshotDirectoryProviderImpl( localRootDir, new JobID(), new JobVertexID(), 0))); } diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java index 7d766858483..2f2e80dd07c 100644 --- a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java @@ -187,7 +187,7 @@ public class StateBackendBenchmarkUtils { 2, new KeyGroupRange(0, 1), executionConfig, - new LocalRecoveryConfig(null), + LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED, RocksDBPriorityQueueConfig.buildWithPriorityQueueType( EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB), ttlTimeProvider, @@ -225,7 +225,7 @@ public class StateBackendBenchmarkUtils { LatencyTrackingStateConfig.disabled(), Collections.emptyList(), AbstractStateBackend.getCompressionDecorator(executionConfig), - new LocalRecoveryConfig(null), + LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED, priorityQueueSetFactory, false, new CloseableRegistry()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java index 69b8ce40260..a2e0c7e5c71 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java @@ -40,8 +40,8 @@ import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.runtime.state.LocalRecoveryConfig; -import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; -import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl; +import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider; +import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl; import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateInitializationContext; @@ -177,13 +177,16 @@ public class StreamOperatorSnapshotRestoreTest extends TestLogger { JobVertexID jobVertexID = new JobVertexID(); int subtaskIdx = 0; - LocalRecoveryDirectoryProvider directoryProvider = + LocalSnapshotDirectoryProvider directoryProvider = mode == ONLY_JM_RECOVERY ? null - : new LocalRecoveryDirectoryProviderImpl( + : new LocalSnapshotDirectoryProviderImpl( temporaryFolder.newFolder(), jobID, jobVertexID, subtaskIdx); - LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(directoryProvider); + LocalRecoveryConfig localRecoveryConfig = + (directoryProvider == null) + ? LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED + : LocalRecoveryConfig.backupAndRecoveryEnabled(directoryProvider); MockEnvironment mockEnvironment = new MockEnvironmentBuilder()