Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165340438 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -501,4 +529,53 @@ public String toString() { "', asynchronous: " + asynchronousSnapshots + ", fileStateThreshold: " + fileStateThreshold + ")"; } + + /** + * This enum represents the different modes for local recovery. + */ + public enum LocalRecoveryMode { + DISABLED, ENABLE_FILE_BASED, ENABLE_HEAP_BASED + } + + /** + * This class encapsulates the configuration for local recovery of this backend. + */ + public static final class LocalRecoveryConfig implements Serializable { + + private static final long serialVersionUID = 1L; + private static final LocalRecoveryConfig DISABLED_SINGLETON = + new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, null); + + private final LocalRecoveryMode localRecoveryMode; + private final File localStateDirectory; + + LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File localStateDirectory) { + this.localRecoveryMode = Preconditions.checkNotNull(localRecoveryMode); + this.localStateDirectory = localStateDirectory; + if (LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && localStateDirectory == null) { + throw new IllegalStateException("Local state directory must be specified if local recovery mode is " + + LocalRecoveryMode.ENABLE_FILE_BASED); + } + } + + public LocalRecoveryMode getLocalRecoveryMode() { + return localRecoveryMode; + } + + public File getLocalStateDirectory() { + return localStateDirectory; --- End diff -- How does this play together with `ENABLE_HEAP_BASED` `LocalRecoveryMode`?
---