This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ec4acd2c99 [FLINK-27155][changelog] Reduce multiple reads to the same 
Changelog file in the same taskmanager during restore
1ec4acd2c99 is described below

commit 1ec4acd2c993409092ddcb7121e2c9647bb4a086
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Sun Apr 17 15:34:07 2022 +0800

    [FLINK-27155][changelog] Reduce multiple reads to the same Changelog file 
in the same taskmanager during restore
---
 .../fs_state_changelog_configuration.html          |   6 +
 .../changelog/fs/ChangelogStreamHandleReader.java  |  30 ++-
 .../fs/ChangelogStreamHandleReaderWithCache.java   | 221 ++++++++++++++++++
 .../flink/changelog/fs/ChangelogStreamWrapper.java |  62 +++++
 .../changelog/fs/FsStateChangelogOptions.java      |   8 +
 .../changelog/fs/FsStateChangelogStorage.java      |   1 +
 .../fs/FsStateChangelogStorageFactory.java         |   5 +-
 .../fs/FsStateChangelogStorageForRecovery.java     |  17 +-
 .../flink/changelog/fs/StateChangeFormat.java      |  35 +--
 ...rRecovery.java => StateChangeIteratorImpl.java} |  30 +--
 .../api/runtime/SavepointTaskStateManager.java     |  10 +
 .../TaskExecutorStateChangelogStoragesManager.java |  95 +++++++-
 .../flink/runtime/state/TaskStateManager.java      |  10 +
 .../flink/runtime/state/TaskStateManagerImpl.java  |  26 +++
 .../changelog/StateChangelogStorageFactory.java    |   2 +-
 .../changelog/StateChangelogStorageLoader.java     |   5 +-
 .../InMemoryStateChangelogStorageFactory.java      |   2 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   3 +-
 ...kExecutorStateChangelogStoragesManagerTest.java |   5 +-
 .../runtime/state/TaskStateManagerImplTest.java    |   4 +
 .../flink/runtime/state/TestTaskStateManager.java  |  21 ++
 .../inmemory/StateChangelogStorageLoaderTest.java  |   3 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java      |   2 +
 .../state/changelog/ChangelogStateBackend.java     |   2 +
 .../DeactivatedChangelogStateBackend.java          |   3 +
 .../restore/ChangelogBackendRestoreOperation.java  |  17 +-
 .../StateInitializationContextImplTest.java        |   2 +
 .../runtime/tasks/LocalStateForwardingTest.java    |   2 +
 .../streaming/runtime/tasks/StreamTaskTest.java    |   3 +
 .../test/state/ChangelogRecoveryCachingITCase.java | 253 +++++++++++++++++++++
 30 files changed, 807 insertions(+), 78 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html 
b/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
index cc3b64dfcec..d5ca1bfadf1 100644
--- a/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
+++ b/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
@@ -38,6 +38,12 @@
             <td>Integer</td>
             <td>Number of threads to use to discard changelog (e.g. 
pre-emptively uploaded unused state).</td>
         </tr>
+        <tr>
+            <td><h5>dstl.dfs.download.local-cache.idle-timeout-ms</h5></td>
+            <td style="word-wrap: break-word;">10 min</td>
+            <td>Duration</td>
+            <td>Maximum idle time for cache files of distributed changelog 
file, after which the cache files will be deleted.</td>
+        </tr>
         <tr>
             <td><h5>dstl.dfs.preemptive-persist-threshold</h5></td>
             <td style="word-wrap: break-word;">5 mb</td>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReader.java
similarity index 50%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
copy to 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReader.java
index f60ad065094..c633acf3bfd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReader.java
@@ -16,29 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.changelog;
+package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.StreamStateHandle;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 
-/**
- * A factory for {@link StateChangelogStorage}. Please use {@link 
StateChangelogStorageLoader} to
- * create {@link StateChangelogStorage}.
- */
+import static org.apache.flink.changelog.fs.ChangelogStreamWrapper.wrapAndSeek;
+
+/** Changelog handle reader to use by {@link StateChangeIteratorImpl}. */
 @Internal
-public interface StateChangelogStorageFactory {
-    /** Get the identifier for user to use this changelog storage factory. */
-    String getIdentifier();
+interface ChangelogStreamHandleReader extends AutoCloseable {
+
+    DataInputStream openAndSeek(StreamStateHandle handle, Long offset) throws 
IOException;
 
-    /** Create the storage based on a configuration. */
-    StateChangelogStorage<?> createStorage(
-            JobID jobID, Configuration configuration, 
TaskManagerJobMetricGroup metricGroup)
-            throws IOException;
+    @Override
+    default void close() throws Exception {}
 
-    /** Create the storage for recovery. */
-    StateChangelogStorageView<?> createStorageView() throws IOException;
+    ChangelogStreamHandleReader DIRECT_READER =
+            (handle, offset) -> wrapAndSeek(handle.openInputStream(), offset);
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java
new file mode 100644
index 00000000000..1501cd7482f
--- /dev/null
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RefCountedFile;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.changelog.fs.ChangelogStreamWrapper.wrap;
+import static org.apache.flink.changelog.fs.ChangelogStreamWrapper.wrapAndSeek;
+import static 
org.apache.flink.changelog.fs.FsStateChangelogOptions.CACHE_IDLE_TIMEOUT;
+
+/** StateChangeIterator with local cache. */
+class ChangelogStreamHandleReaderWithCache implements 
ChangelogStreamHandleReader {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(ChangelogStreamHandleReaderWithCache.class);
+
+    private static final String CACHE_FILE_SUB_DIR = "dstl-cache-file";
+    private static final String CACHE_FILE_PREFIX = "dstl";
+
+    // reference count == 1 means only cache component reference the cache file
+    private static final int NO_USING_REF_COUNT = 1;
+
+    private final File[] cacheDirectories;
+    private final AtomicInteger next;
+
+    private final ConcurrentHashMap<Path, RefCountedFile> cache = new 
ConcurrentHashMap<>();
+    private final ScheduledExecutorService cacheCleanScheduler;
+    private final long cacheIdleMillis;
+
+    ChangelogStreamHandleReaderWithCache(Configuration config) {
+        this.cacheDirectories =
+                Arrays.stream(ConfigurationUtils.parseTempDirectories(config))
+                        .map(path -> new File(path, CACHE_FILE_SUB_DIR))
+                        .toArray(File[]::new);
+        Arrays.stream(this.cacheDirectories).forEach(File::mkdirs);
+
+        this.next = new AtomicInteger(new 
Random().nextInt(this.cacheDirectories.length));
+
+        this.cacheCleanScheduler =
+                SchedulerFactory.create(1, "ChangelogCacheFileCleanScheduler", 
LOG);
+        this.cacheIdleMillis = config.get(CACHE_IDLE_TIMEOUT).toMillis();
+    }
+
+    @Override
+    public DataInputStream openAndSeek(StreamStateHandle handle, Long offset) 
throws IOException {
+        if (!canBeCached(handle)) {
+            return wrapAndSeek(handle.openInputStream(), offset);
+        }
+
+        final FileStateHandle fileHandle = (FileStateHandle) handle;
+        final RefCountedFile refCountedFile = getRefCountedFile(fileHandle);
+
+        FileInputStream fin = openAndSeek(refCountedFile, offset);
+
+        LOG.debug(
+                "return cached file {} (rc={}) for {} (offset={})",
+                refCountedFile.getFile(),
+                refCountedFile.getReferenceCounter(),
+                handle.getStreamStateHandleID(),
+                offset);
+        return wrapStream(fileHandle.getFilePath(), fin);
+    }
+
+    private boolean canBeCached(StreamStateHandle handle) throws IOException {
+        if (handle instanceof FileStateHandle) {
+            FileStateHandle fileHandle = (FileStateHandle) handle;
+            return fileHandle.getFilePath().getFileSystem().isDistributedFS();
+        } else {
+            return false;
+        }
+    }
+
+    private RefCountedFile getRefCountedFile(FileStateHandle fileHandle) {
+        return cache.compute(
+                fileHandle.getFilePath(),
+                (key, oldValue) -> {
+                    if (oldValue == null) {
+                        oldValue = downloadToCacheFile(fileHandle);
+                    }
+                    oldValue.retain();
+                    return oldValue;
+                });
+    }
+
+    private RefCountedFile downloadToCacheFile(FileStateHandle fileHandle) {
+        File directory = cacheDirectories[next.getAndIncrement() % 
cacheDirectories.length];
+        File file;
+        try {
+            file = File.createTempFile(CACHE_FILE_PREFIX, null, directory);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+            return null;
+        }
+
+        try (InputStream in = wrap(fileHandle.openInputStream());
+                OutputStream out = new FileOutputStream(file)) {
+            LOG.debug(
+                    "download and decompress dstl file : {} to cache file : 
{}",
+                    fileHandle.getFilePath(),
+                    file.getPath());
+            IOUtils.copyBytes(in, out, false);
+
+            return new RefCountedFile(file);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+            return null;
+        }
+    }
+
+    private FileInputStream openAndSeek(RefCountedFile refCountedFile, long 
offset)
+            throws IOException {
+        FileInputStream fin = new FileInputStream(refCountedFile.getFile());
+        if (offset != 0) {
+            LOG.trace("seek to {}", offset);
+            fin.getChannel().position(offset);
+        }
+        return fin;
+    }
+
+    private DataInputStream wrapStream(Path dfsPath, FileInputStream fin) {
+        return new DataInputStream(new BufferedInputStream(fin)) {
+            @Override
+            public void close() throws IOException {
+                try {
+                    super.close();
+                } finally {
+                    cache.computeIfPresent(
+                            dfsPath,
+                            (key, value) -> {
+                                value.release();
+                                if (value.getReferenceCounter() == 
NO_USING_REF_COUNT) {
+                                    cacheCleanScheduler.schedule(
+                                            () -> cleanCacheFile(dfsPath),
+                                            cacheIdleMillis,
+                                            TimeUnit.MILLISECONDS);
+                                }
+                                return value;
+                            });
+                }
+            }
+        };
+    }
+
+    private void cleanCacheFile(Path dfsPath) {
+        cache.computeIfPresent(
+                dfsPath,
+                (key, value) -> {
+                    if (value.getReferenceCounter() == NO_USING_REF_COUNT) {
+                        LOG.debug(
+                                "clean cached file : {} after {}ms idle",
+                                value.getFile().getPath(),
+                                cacheIdleMillis);
+                        value.release();
+                        // remove the cache file
+                        return null;
+                    } else {
+                        return value;
+                    }
+                });
+    }
+
+    @Override
+    public void close() throws Exception {
+        cacheCleanScheduler.shutdownNow();
+        if (!cacheCleanScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+            LOG.warn(
+                    "Unable to cleanly shutdown cache clean scheduler of "
+                            + "ChangelogHandleReaderWithCache in 5s");
+        }
+
+        Iterator<RefCountedFile> iterator = cache.values().iterator();
+        while (iterator.hasNext()) {
+            RefCountedFile cacheFile = iterator.next();
+            iterator.remove();
+            LOG.debug("cleanup on close: {}", cacheFile.getFile().toPath());
+            Files.deleteIfExists(cacheFile.getFile().toPath());
+        }
+    }
+}
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamWrapper.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamWrapper.java
new file mode 100644
index 00000000000..b4e724b8f41
--- /dev/null
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+class ChangelogStreamWrapper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogStreamWrapper.class);
+
+    static DataInputViewStreamWrapper wrap(InputStream stream) throws 
IOException {
+        BufferedInputStream bufferedStream = new BufferedInputStream(stream);
+        boolean compressed = bufferedStream.read() == 1;
+        return new DataInputViewStreamWrapper(
+                compressed
+                        ? 
SnappyStreamCompressionDecorator.INSTANCE.decorateWithCompression(
+                                bufferedStream)
+                        : bufferedStream) {
+            @Override
+            public void close() throws IOException {
+                try {
+                    super.close();
+                } finally {
+                    bufferedStream.close();
+                }
+            }
+        };
+    }
+
+    static DataInputViewStreamWrapper wrapAndSeek(InputStream stream, long 
offset)
+            throws IOException {
+        DataInputViewStreamWrapper wrappedStream = wrap(stream);
+        if (offset != 0) {
+            LOG.debug("seek to {}", offset);
+            wrappedStream.skipBytesToRead((int) offset);
+        }
+        return wrappedStream;
+    }
+}
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
index 25791f1cea2..4d05962f2ee 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
@@ -144,4 +144,12 @@ public class FsStateChangelogOptions {
                     .defaultValue(Duration.ofMillis(500))
                     .withDescription(
                             "Delay before the next attempt (if the failure was 
not caused by a timeout).");
+
+    public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT =
+            ConfigOptions.key("dstl.dfs.download.local-cache.idle-timeout-ms")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(10))
+                    .withDescription(
+                            "Maximum idle time for cache files of distributed 
changelog file, "
+                                    + "after which the cache files will be 
deleted.");
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index 972135b7845..de7718521a0 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -112,6 +112,7 @@ public class FsStateChangelogStorage extends 
FsStateChangelogStorageForRecovery
             StateChangeUploadScheduler uploader,
             long preEmptivePersistThresholdInBytes,
             TaskChangelogRegistry changelogRegistry) {
+        super(ChangelogStreamHandleReader.DIRECT_READER);
         this.preEmptivePersistThresholdInBytes = 
preEmptivePersistThresholdInBytes;
         this.changelogRegistry = changelogRegistry;
         this.uploader = uploader;
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
index 561a25de153..c81931241a5 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
@@ -53,8 +53,9 @@ public class FsStateChangelogStorageFactory implements 
StateChangelogStorageFact
     }
 
     @Override
-    public StateChangelogStorageView<?> createStorageView() {
-        return new FsStateChangelogStorageForRecovery();
+    public StateChangelogStorageView<?> createStorageView(Configuration 
configuration) {
+        return new FsStateChangelogStorageForRecovery(
+                new ChangelogStreamHandleReaderWithCache(configuration));
     }
 
     public static void configure(
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
index 377ac81e053..de40328abac 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
@@ -33,8 +33,23 @@ import javax.annotation.concurrent.ThreadSafe;
 public class FsStateChangelogStorageForRecovery
         implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
 
+    private final ChangelogStreamHandleReader changelogStreamHandleReader;
+
+    public FsStateChangelogStorageForRecovery(
+            ChangelogStreamHandleReader changelogStreamHandleReader) {
+        this.changelogStreamHandleReader = changelogStreamHandleReader;
+    }
+
     @Override
     public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> 
createReader() {
-        return new StateChangelogHandleStreamHandleReader(new 
StateChangeFormat());
+        return new StateChangelogHandleStreamHandleReader(
+                new StateChangeIteratorImpl(changelogStreamHandleReader));
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (changelogStreamHandleReader != null) {
+            changelogStreamHandleReader.close();
+        }
     }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index ff2149301e4..7a672da4ba4 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -18,13 +18,8 @@
 package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.changelog.StateChange;
-import 
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
@@ -32,9 +27,8 @@ import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -48,8 +42,7 @@ import static java.util.Comparator.comparing;
 
 /** Serialization format for state changes. */
 @Internal
-public class StateChangeFormat
-        implements StateChangelogHandleStreamHandleReader.StateChangeIterator {
+public class StateChangeFormat {
     private static final Logger LOG = 
LoggerFactory.getLogger(StateChangeFormat.class);
 
     Map<StateChangeSet, Long> write(OutputStreamWithPos os, 
Collection<StateChangeSet> changeSets)
@@ -86,15 +79,8 @@ public class StateChangeFormat
         }
     }
 
-    @Override
-    public CloseableIterator<StateChange> read(StreamStateHandle handle, long 
offset)
-            throws IOException {
-        FSDataInputStream stream = handle.openInputStream();
-        DataInputViewStreamWrapper input = wrap(stream);
-        if (offset != 0) {
-            LOG.debug("seek from {} to {}", stream.getPos(), offset);
-            input.skipBytesToRead((int) offset);
-        }
+    CloseableIterator<StateChange> read(DataInputStream input) throws 
IOException {
+
         return new CloseableIterator<StateChange>() {
             int numUnreadGroups = input.readInt();
             int numLeftInGroup = numUnreadGroups-- == 0 ? 0 : input.readInt();
@@ -141,18 +127,9 @@ public class StateChangeFormat
 
             @Override
             public void close() throws Exception {
-                LOG.trace("close {}", stream);
-                stream.close();
+                LOG.trace("close {}", input);
+                input.close();
             }
         };
     }
-
-    private DataInputViewStreamWrapper wrap(InputStream stream) throws 
IOException {
-        stream = new BufferedInputStream(stream);
-        boolean compressed = stream.read() == 1;
-        return new DataInputViewStreamWrapper(
-                compressed
-                        ? 
SnappyStreamCompressionDecorator.INSTANCE.decorateWithCompression(stream)
-                        : stream);
-    }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeIteratorImpl.java
similarity index 53%
copy from 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
copy to 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeIteratorImpl.java
index 377ac81e053..720cbcb75f3 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeIteratorImpl.java
@@ -18,23 +18,27 @@
 
 package org.apache.flink.changelog.fs;
 
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
-import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.StateChange;
 import 
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
-import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
-import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
+import org.apache.flink.util.CloseableIterator;
 
-import javax.annotation.concurrent.ThreadSafe;
+import java.io.IOException;
 
-/** Filesystem-based implementation of {@link StateChangelogStorage} just for 
recovery. */
-@Experimental
-@ThreadSafe
-public class FsStateChangelogStorageForRecovery
-        implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
+/** StateChangeIterator default implementation. */
+class StateChangeIteratorImpl
+        implements StateChangelogHandleStreamHandleReader.StateChangeIterator {
+
+    private final ChangelogStreamHandleReader changelogStreamHandleReader;
+
+    public StateChangeIteratorImpl(ChangelogStreamHandleReader 
changelogStreamHandleReader) {
+        this.changelogStreamHandleReader = changelogStreamHandleReader;
+    }
 
     @Override
-    public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> 
createReader() {
-        return new StateChangelogHandleStreamHandleReader(new 
StateChangeFormat());
+    public CloseableIterator<StateChange> read(StreamStateHandle handle, long 
offset)
+            throws IOException {
+        return new StateChangeFormat()
+                .read(changelogStreamHandleReader.openAndSeek(handle, offset));
     }
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
index 7c4c592c4fd..cb6193e7724 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.state.api.runtime;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
@@ -27,7 +28,9 @@ import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -104,6 +107,13 @@ final class SavepointTaskStateManager implements 
TaskStateManager {
         return null;
     }
 
+    @Nullable
+    @Override
+    public StateChangelogStorageView<?> getStateChangelogStorageView(
+            Configuration configuration, ChangelogStateHandle 
changelogStateHandle) {
+        return null;
+    }
+
     @Override
     public void notifyCheckpointComplete(long checkpointId) {
         throw new UnsupportedOperationException(MSG);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
index d5cf1711a93..59c3c090bf3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
@@ -22,8 +22,11 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
 import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
@@ -55,6 +58,15 @@ public class TaskExecutorStateChangelogStoragesManager {
     @GuardedBy("lock")
     private final Map<JobID, Optional<StateChangelogStorage<?>>> 
changelogStoragesByJobId;
 
+    /**
+     * This map holds all state changelog storage views of {@link 
ChangelogStateHandleStreamImpl}
+     * for tasks running on the task manager / executor that own the instance 
of this. Value type
+     * Optional is for containing the null value.
+     */
+    @GuardedBy("lock")
+    private final Map<JobID, 
StateChangelogStorageView<ChangelogStateHandleStreamImpl>>
+            changelogStorageViewsByJobId;
+
     @GuardedBy("lock")
     private boolean closed;
 
@@ -65,6 +77,7 @@ public class TaskExecutorStateChangelogStoragesManager {
 
     public TaskExecutorStateChangelogStoragesManager() {
         this.changelogStoragesByJobId = new HashMap<>();
+        this.changelogStorageViewsByJobId = new HashMap<>();
         this.closed = false;
 
         // register a shutdown hook
@@ -120,7 +133,7 @@ public class TaskExecutorStateChangelogStoragesManager {
         }
     }
 
-    public void releaseStateChangelogStorageForJob(@Nonnull JobID jobId) {
+    private void releaseStateChangelogStorageForJob(@Nonnull JobID jobId) {
         LOG.debug("Releasing state changelog storage under job id {}.", jobId);
         Optional<StateChangelogStorage<?>> cleanupChangelogStorage;
         synchronized (lock) {
@@ -135,28 +148,100 @@ public class TaskExecutorStateChangelogStoragesManager {
         }
     }
 
+    @Nullable
+    StateChangelogStorageView<?> stateChangelogStorageViewForJob(
+            @Nonnull JobID jobID,
+            Configuration configuration,
+            ChangelogStateHandle changelogStateHandle)
+            throws IOException {
+        if (closed) {
+            throw new IllegalStateException(
+                    "TaskExecutorStateChangelogStoragesManager is already 
closed and cannot "
+                            + "register a new StateChangelogStorageView.");
+        }
+
+        // This implementation assume there is only one production 
implementation of DSTL
+        // (FsStateChangelogStorage). Maybe we should change the type of
+        // changelogStorageViewsByJobId to map<jobId, map<dstl-identifier, 
dstl>> when there is
+        // another implementation.
+
+        synchronized (lock) {
+            StateChangelogStorageView<ChangelogStateHandleStreamImpl> 
storageView =
+                    changelogStorageViewsByJobId.get(jobID);
+
+            if (storageView == null) {
+                StateChangelogStorageView<?> loaded =
+                        StateChangelogStorageLoader.loadFromStateHandle(
+                                configuration, changelogStateHandle);
+                storageView = 
(StateChangelogStorageView<ChangelogStateHandleStreamImpl>) loaded;
+                changelogStorageViewsByJobId.put(jobID, storageView);
+
+                LOG.debug(
+                        "Registered new state changelog storage view for job 
{} : {}.",
+                        jobID,
+                        loaded);
+            } else {
+                LOG.debug(
+                        "Found existing state changelog storage view for job 
{}: {}.",
+                        jobID,
+                        storageView);
+            }
+
+            return storageView;
+        }
+    }
+
+    private void releaseStateChangelogStorageViewForJob(@Nonnull JobID jobID) {
+        LOG.debug("Releasing state changelog storage view under job id {}.", 
jobID);
+        StateChangelogStorageView<ChangelogStateHandleStreamImpl> 
cleanupStorageView;
+        synchronized (lock) {
+            if (closed) {
+                return;
+            }
+            cleanupStorageView = changelogStorageViewsByJobId.remove(jobID);
+        }
+
+        if (cleanupStorageView != null) {
+            doRelease(cleanupStorageView);
+        }
+    }
+
+    public void releaseResourcesForJob(@Nonnull JobID jobID) {
+        releaseStateChangelogStorageForJob(jobID);
+        releaseStateChangelogStorageViewForJob(jobID);
+    }
+
     public void shutdown() {
-        HashMap<JobID, Optional<StateChangelogStorage<?>>> toRelease;
+        HashMap<JobID, Optional<StateChangelogStorage<?>>> toReleaseStorage;
+        HashMap<JobID, 
StateChangelogStorageView<ChangelogStateHandleStreamImpl>>
+                toReleaseStorageView;
         synchronized (lock) {
             if (closed) {
                 return;
             }
             closed = true;
 
-            toRelease = new HashMap<>(changelogStoragesByJobId);
+            toReleaseStorage = new HashMap<>(changelogStoragesByJobId);
+            toReleaseStorageView = new HashMap<>(changelogStorageViewsByJobId);
             changelogStoragesByJobId.clear();
+            changelogStorageViewsByJobId.clear();
         }
 
         ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
 
         LOG.info("Shutting down TaskExecutorStateChangelogStoragesManager.");
 
-        for (Map.Entry<JobID, Optional<StateChangelogStorage<?>>> entry : 
toRelease.entrySet()) {
+        for (Map.Entry<JobID, Optional<StateChangelogStorage<?>>> entry :
+                toReleaseStorage.entrySet()) {
             entry.getValue().ifPresent(this::doRelease);
         }
+        for (Map.Entry<JobID, 
StateChangelogStorageView<ChangelogStateHandleStreamImpl>> entry :
+                toReleaseStorageView.entrySet()) {
+            doRelease(entry.getValue());
+        }
     }
 
-    private void doRelease(StateChangelogStorage<?> storage) {
+    private void doRelease(StateChangelogStorageView<?> storage) {
         if (storage != null) {
             try {
                 storage.close();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
index 60041104923..7c979b6b0d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
@@ -26,7 +27,9 @@ import 
org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -101,4 +104,11 @@ public interface TaskStateManager extends 
CheckpointListener, AutoCloseable {
     /** Returns the configured state changelog storage for this task. */
     @Nullable
     StateChangelogStorage<?> getStateChangelogStorage();
+
+    /**
+     * Returns the state changelog storage view of given {@link 
ChangelogStateHandle} for this task.
+     */
+    @Nullable
+    StateChangelogStorageView<?> getStateChangelogStorageView(
+            Configuration configuration, ChangelogStateHandle 
changelogStateHandle);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index 2d14d37e01a..a82b55b155b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
@@ -30,8 +31,11 @@ import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -75,6 +80,8 @@ public class TaskStateManagerImpl implements TaskStateManager 
{
     /** The changelog storage where the manager reads and writes the changelog 
*/
     @Nullable private final StateChangelogStorage<?> stateChangelogStorage;
 
+    private final TaskExecutorStateChangelogStoragesManager 
changelogStoragesManager;
+
     /** The checkpoint responder through which this manager can report to the 
job manager. */
     private final CheckpointResponder checkpointResponder;
 
@@ -85,6 +92,7 @@ public class TaskStateManagerImpl implements TaskStateManager 
{
             @Nonnull ExecutionAttemptID executionAttemptID,
             @Nonnull TaskLocalStateStore localStateStore,
             @Nullable StateChangelogStorage<?> stateChangelogStorage,
+            @Nonnull TaskExecutorStateChangelogStoragesManager 
changelogStoragesManager,
             @Nullable JobManagerTaskRestore jobManagerTaskRestore,
             @Nonnull CheckpointResponder checkpointResponder) {
         this(
@@ -92,6 +100,7 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
                 executionAttemptID,
                 localStateStore,
                 stateChangelogStorage,
+                changelogStoragesManager,
                 jobManagerTaskRestore,
                 checkpointResponder,
                 new SequentialChannelStateReaderImpl(
@@ -105,12 +114,14 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
             @Nonnull ExecutionAttemptID executionAttemptID,
             @Nonnull TaskLocalStateStore localStateStore,
             @Nullable StateChangelogStorage<?> stateChangelogStorage,
+            @Nonnull TaskExecutorStateChangelogStoragesManager 
changelogStoragesManager,
             @Nullable JobManagerTaskRestore jobManagerTaskRestore,
             @Nonnull CheckpointResponder checkpointResponder,
             @Nonnull SequentialChannelStateReaderImpl 
sequentialChannelStateReader) {
         this.jobId = jobId;
         this.localStateStore = localStateStore;
         this.stateChangelogStorage = stateChangelogStorage;
+        this.changelogStoragesManager = changelogStoragesManager;
         this.jobManagerTaskRestore = jobManagerTaskRestore;
         this.executionAttemptID = executionAttemptID;
         this.checkpointResponder = checkpointResponder;
@@ -244,6 +255,21 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
         return stateChangelogStorage;
     }
 
+    @Nullable
+    @Override
+    public StateChangelogStorageView<?> getStateChangelogStorageView(
+            Configuration configuration, ChangelogStateHandle 
changelogStateHandle) {
+        StateChangelogStorageView<?> storageView = null;
+        try {
+            storageView =
+                    changelogStoragesManager.stateChangelogStorageViewForJob(
+                            jobId, configuration, changelogStateHandle);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        }
+        return storageView;
+    }
+
     /** Tracking when local state can be confirmed and disposed. */
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
index f60ad065094..be86a416e6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
@@ -40,5 +40,5 @@ public interface StateChangelogStorageFactory {
             throws IOException;
 
     /** Create the storage for recovery. */
-    StateChangelogStorageView<?> createStorageView() throws IOException;
+    StateChangelogStorageView<?> createStorageView(Configuration 
configuration) throws IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
index 7947e83e145..622c09f9956 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
@@ -106,7 +106,8 @@ public class StateChangelogStorageLoader {
 
     @Nonnull
     public static StateChangelogStorageView<?> loadFromStateHandle(
-            ChangelogStateHandle changelogStateHandle) throws IOException {
+            Configuration configuration, ChangelogStateHandle 
changelogStateHandle)
+            throws IOException {
         StateChangelogStorageFactory factory =
                 
STATE_CHANGELOG_STORAGE_FACTORIES.get(changelogStateHandle.getStorageIdentifier());
         if (factory == null) {
@@ -120,7 +121,7 @@ public class StateChangelogStorageLoader {
                     "Creating a changelog storage with name '{}' to restore 
from '{}'.",
                     changelogStateHandle.getStorageIdentifier(),
                     changelogStateHandle.getClass().getSimpleName());
-            return factory.createStorageView();
+            return factory.createStorageView(configuration);
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
index d09d638d242..786a6a7be29 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
@@ -41,7 +41,7 @@ public class InMemoryStateChangelogStorageFactory implements 
StateChangelogStora
     }
 
     @Override
-    public StateChangelogStorageView<?> createStorageView() {
+    public StateChangelogStorageView<?> createStorageView(Configuration 
configuration) {
         return new InMemoryStateChangelogStorage();
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 3b04d8a7a40..26b274f5a5c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -713,6 +713,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                             tdd.getExecutionAttemptId(),
                             localStateStore,
                             changelogStorage,
+                            changelogStoragesManager,
                             taskRestore,
                             checkpointResponder);
 
@@ -1808,7 +1809,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                             closeJob(job, cause);
                         });
         taskManagerMetricGroup.removeJobMetricsGroup(jobId);
-        changelogStoragesManager.releaseStateChangelogStorageForJob(jobId);
+        changelogStoragesManager.releaseResourcesForJob(jobId);
         currentSlotOfferPerJob.remove(jobId);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
index cf6ef1deb3e..b0efaa9c427 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
@@ -82,7 +82,7 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
                         jobId1, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
         Assert.assertTrue(storage1 instanceof TestStateChangelogStorage);
         Assert.assertFalse(((TestStateChangelogStorage) storage1).closed);
-        manager.releaseStateChangelogStorageForJob(jobId1);
+        manager.releaseResourcesForJob(jobId1);
         Assert.assertTrue(((TestStateChangelogStorage) storage1).closed);
 
         StateChangelogStorage<?> storage2 =
@@ -212,7 +212,8 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
         }
 
         @Override
-        public StateChangelogStorageView<?> createStorageView() throws 
IOException {
+        public StateChangelogStorageView<?> createStorageView(Configuration 
configuration)
+                throws IOException {
             return new TestStateChangelogStorage();
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index 5eb2664b5d3..9b3f6f17342 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -282,6 +282,7 @@ public class TaskStateManagerImplTest extends TestLogger {
                         createExecutionAttemptId(),
                         new TestTaskLocalStateStore(),
                         null,
+                        new TaskExecutorStateChangelogStoragesManager(),
                         jobManagerTaskRestore,
                         new TestCheckpointResponder());
         Assert.assertTrue(stateManager.isTaskDeployedAsFinished());
@@ -294,6 +295,7 @@ public class TaskStateManagerImplTest extends TestLogger {
                         createExecutionAttemptId(),
                         new TestTaskLocalStateStore(),
                         null,
+                        new TaskExecutorStateChangelogStoragesManager(),
                         null,
                         new TestCheckpointResponder());
         
Assert.assertFalse(emptyStateManager.getRestoreCheckpointId().isPresent());
@@ -304,6 +306,7 @@ public class TaskStateManagerImplTest extends TestLogger {
                         createExecutionAttemptId(),
                         new TestTaskLocalStateStore(),
                         null,
+                        new TaskExecutorStateChangelogStoragesManager(),
                         new JobManagerTaskRestore(2, new TaskStateSnapshot()),
                         new TestCheckpointResponder());
         Assert.assertEquals(2L, (long) 
nonEmptyStateManager.getRestoreCheckpointId().get());
@@ -322,6 +325,7 @@ public class TaskStateManagerImplTest extends TestLogger {
                 executionAttemptID,
                 localStateStore,
                 stateChangelogStorage,
+                new TaskExecutorStateChangelogStoragesManager(),
                 jobManagerTaskRestore,
                 checkpointResponderMock);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index 64b3da7a327..a98d0fd7b06 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -29,14 +30,19 @@ import 
org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
 import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
+import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -223,6 +229,21 @@ public class TestTaskStateManager implements 
TaskStateManager {
         return stateChangelogStorage;
     }
 
+    @org.jetbrains.annotations.Nullable
+    @Override
+    public StateChangelogStorageView<?> getStateChangelogStorageView(
+            Configuration configuration, ChangelogStateHandle 
changelogStateHandle) {
+        StateChangelogStorageView<?> storageView = null;
+        try {
+            storageView =
+                    StateChangelogStorageLoader.loadFromStateHandle(
+                            configuration, changelogStateHandle);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        }
+        return storageView;
+    }
+
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         this.notifiedCompletedCheckpointId = checkpointId;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
index 988a707242c..6c091eeb546 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
@@ -125,7 +125,8 @@ public class StateChangelogStorageLoaderTest {
         }
 
         @Override
-        public StateChangelogStorageView<?> createStorageView() throws 
IOException {
+        public StateChangelogStorageView<?> createStorageView(Configuration 
configuration)
+                throws IOException {
             return new TestStateChangelogStorage();
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 2e142af9593..d86e2219af5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.memory.MemoryManagerBuilder;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import 
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
 import org.apache.flink.runtime.state.TaskLocalStateStore;
 import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -219,6 +220,7 @@ public class JvmExitOnFatalErrorTest extends TestLogger {
                                 executionAttemptID,
                                 localStateStore,
                                 changelogStorage,
+                                new 
TaskExecutorStateChangelogStoragesManager(),
                                 null,
                                 mock(CheckpointResponder.class));
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
index b73f7f7a5a1..b186cd4bedb 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
@@ -90,7 +90,9 @@ public class ChangelogStateBackend extends 
AbstractChangelogStateBackend
         ChangelogStateFactory changelogStateFactory = new 
ChangelogStateFactory();
         CheckpointableKeyedStateBackend<K> keyedStateBackend =
                 ChangelogBackendRestoreOperation.restore(
+                        env.getTaskManagerInfo().getConfiguration(),
                         env.getUserCodeClassLoader().asClassLoader(),
+                        env.getTaskStateManager(),
                         stateBackendHandles,
                         baseBackendBuilder,
                         (baseBackend, baseState) ->
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/DeactivatedChangelogStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/DeactivatedChangelogStateBackend.java
index d8f1ec70e47..88808064f9e 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/DeactivatedChangelogStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/DeactivatedChangelogStateBackend.java
@@ -66,8 +66,11 @@ public class DeactivatedChangelogStateBackend extends 
AbstractChangelogStateBack
         // So we need to rebound the checkpoint id to the real checkpoint id 
here.
         stateBackendHandles = reboundCheckpoint(stateBackendHandles);
         ChangelogStateFactory changelogStateFactory = new 
ChangelogStateFactory();
+
         return ChangelogBackendRestoreOperation.restore(
+                env.getTaskManagerInfo().getConfiguration(),
                 env.getUserCodeClassLoader().asClassLoader(),
+                env.getTaskStateManager(),
                 stateBackendHandles,
                 baseBackendBuilder,
                 (baseBackend, baseState) ->
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
index 832ca0fff40..ef237019678 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
@@ -18,14 +18,15 @@
 package org.apache.flink.state.changelog.restore;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
-import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
 import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
@@ -60,7 +61,9 @@ public class ChangelogBackendRestoreOperation {
                     Exception> {}
 
     public static <K> CheckpointableKeyedStateBackend<K> restore(
+            Configuration configuration,
             ClassLoader classLoader,
+            TaskStateManager taskStateManager,
             Collection<ChangelogStateBackendHandle> stateHandles,
             BaseBackendBuilder<K> baseBackendBuilder,
             ChangelogRestoreTargetBuilder<K> changelogRestoreTargetBuilder)
@@ -72,7 +75,12 @@ public class ChangelogBackendRestoreOperation {
 
         for (ChangelogStateBackendHandle handle : stateHandles) {
             if (handle != null) { // null is empty state (no change)
-                readBackendHandle(changelogRestoreTarget, handle, classLoader);
+                readBackendHandle(
+                        configuration,
+                        taskStateManager,
+                        changelogRestoreTarget,
+                        handle,
+                        classLoader);
             }
         }
         return changelogRestoreTarget.getRestoredKeyedStateBackend();
@@ -80,6 +88,8 @@ public class ChangelogBackendRestoreOperation {
 
     @SuppressWarnings("unchecked")
     private static <T extends ChangelogStateHandle> void readBackendHandle(
+            Configuration configuration,
+            TaskStateManager taskStateManager,
             ChangelogRestoreTarget<?> changelogRestoreTarget,
             ChangelogStateBackendHandle backendHandle,
             ClassLoader classLoader)
@@ -89,7 +99,8 @@ public class ChangelogBackendRestoreOperation {
                 backendHandle.getNonMaterializedStateHandles()) {
             StateChangelogHandleReader<T> changelogHandleReader =
                     (StateChangelogHandleReader<T>)
-                            
StateChangelogStorageLoader.loadFromStateHandle(changelogHandle)
+                            taskStateManager
+                                    
.getStateChangelogStorageView(configuration, changelogHandle)
                                     .createReader();
             try (CloseableIterator<StateChange> changes =
                     changelogHandleReader.getChanges((T) changelogHandle)) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index bacc3a6cb06..916caba04b1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import 
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
 import org.apache.flink.runtime.state.TestTaskLocalStateStore;
@@ -169,6 +170,7 @@ public class StateInitializationContextImplTest {
                         createExecutionAttemptId(),
                         new TestTaskLocalStateStore(),
                         new InMemoryStateChangelogStorage(),
+                        new TaskExecutorStateChangelogStoragesManager(),
                         jobManagerTaskRestore,
                         mock(CheckpointResponder.class));
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index 318256fea7f..c7f13aad326 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateObject;
+import 
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
 import org.apache.flink.runtime.state.TaskLocalStateStore;
 import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
@@ -251,6 +252,7 @@ public class LocalStateForwardingTest extends TestLogger {
                         executionAttemptID,
                         taskLocalStateStore,
                         stateChangelogStorage,
+                        new TaskExecutorStateChangelogStoragesManager(),
                         null,
                         checkpointResponder);
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 782344dfe20..8777de72fcc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import 
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
 import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
@@ -789,6 +790,7 @@ public class StreamTaskTest extends TestLogger {
                         createExecutionAttemptId(),
                         mock(TaskLocalStateStoreImpl.class),
                         new InMemoryStateChangelogStorage(),
+                        new TaskExecutorStateChangelogStoragesManager(),
                         null,
                         checkpointResponder);
 
@@ -982,6 +984,7 @@ public class StreamTaskTest extends TestLogger {
                         createExecutionAttemptId(),
                         mock(TaskLocalStateStoreImpl.class),
                         new InMemoryStateChangelogStorage(),
+                        new TaskExecutorStateChangelogStoragesManager(),
                         null,
                         checkpointResponder);
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java
new file mode 100644
index 00000000000..01f6353473c
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static java.util.Collections.singletonMap;
+import static 
org.apache.flink.changelog.fs.FsStateChangelogOptions.CACHE_IDLE_TIMEOUT;
+import static 
org.apache.flink.changelog.fs.FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD;
+import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
+import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE;
+import static 
org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
+import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
+import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;
+import static 
org.apache.flink.configuration.StateChangelogOptions.ENABLE_STATE_CHANGE_LOG;
+import static 
org.apache.flink.configuration.StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL;
+import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED;
+import static 
org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint;
+import static 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_MODE;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_UNALIGNED;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Tests caching of changelog segments downloaded during recovery. */
+public class ChangelogRecoveryCachingITCase extends TestLogger {
+    private static final int ACCUMULATE_TIME_MILLIS = 500; // high enough to 
build some state
+    private static final int PARALLELISM = 10; // high enough to trigger DSTL 
file multiplexing
+
+    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    private OpenOnceFileSystem fileSystem;
+
+    private MiniClusterWithClientResource cluster;
+
+    @Before
+    public void before() throws Exception {
+        File tmpFolder = temporaryFolder.newFolder();
+        registerFileSystem(fileSystem = new OpenOnceFileSystem(), 
tmpFolder.toURI().getScheme());
+
+        Configuration configuration = new Configuration();
+        configuration.set(CACHE_IDLE_TIMEOUT, Duration.ofDays(365)); // cache 
forever
+
+        FsStateChangelogStorageFactory.configure(
+                configuration, tmpFolder, Duration.ofMinutes(1), 10);
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configuration)
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(PARALLELISM)
+                                .build());
+        cluster.before();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (cluster != null) {
+            cluster.after();
+            cluster = null;
+        }
+        FileSystem.initialize(new Configuration(), null);
+    }
+
+    @Test
+    public void test() throws Exception {
+        JobID jobID1 = submit(configureJob(temporaryFolder.newFolder()), graph 
-> {});
+
+        Thread.sleep(ACCUMULATE_TIME_MILLIS);
+        String cpLocation = checkpointAndCancel(jobID1);
+
+        JobID jobID2 =
+                submit(
+                        configureJob(temporaryFolder.newFolder()),
+                        graph -> 
graph.setSavepointRestoreSettings(forPath(cpLocation)));
+        waitForAllTaskRunning(cluster.getMiniCluster(), jobID2, true);
+        cluster.getClusterClient().cancel(jobID2).get();
+
+        checkState(fileSystem.hasOpenedPaths());
+    }
+
+    private JobID submit(Configuration conf, Consumer<JobGraph> updateGraph)
+            throws InterruptedException, ExecutionException {
+        JobGraph jobGraph = createJobGraph(conf);
+        updateGraph.accept(jobGraph);
+        return cluster.getClusterClient().submitJob(jobGraph).get();
+    }
+
+    private JobGraph createJobGraph(Configuration conf) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+        env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE)
+                .keyBy(num -> num % 1000)
+                .map(
+                        new RichMapFunction<Long, Long>() {
+                            @Override
+                            public Long map(Long value) throws Exception {
+                                getRuntimeContext()
+                                        .getState(new 
ValueStateDescriptor<>("state", Long.class))
+                                        .update(value);
+                                return value;
+                            }
+                        })
+                .addSink(new DiscardingSink<>());
+
+        return env.getStreamGraph().getJobGraph();
+    }
+
+    private Configuration configureJob(File cpDir) {
+        Configuration conf = new Configuration();
+
+        conf.set(EXTERNALIZED_CHECKPOINT, RETAIN_ON_CANCELLATION);
+        conf.set(DEFAULT_PARALLELISM, PARALLELISM);
+        conf.set(ENABLE_STATE_CHANGE_LOG, true);
+        conf.set(CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
+        conf.set(CHECKPOINTING_INTERVAL, Duration.ofMillis(10));
+        conf.set(CHECKPOINT_STORAGE, "filesystem");
+        conf.set(CHECKPOINTS_DIRECTORY, cpDir.toURI().toString());
+        conf.set(STATE_BACKEND, "hashmap");
+        conf.set(LOCAL_RECOVERY, false); // force download
+        // tune changelog
+        conf.set(PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.ofMebiBytes(10));
+        conf.set(PERIODIC_MATERIALIZATION_INTERVAL, Duration.ofDays(365));
+
+        conf.set(ENABLE_UNALIGNED, true); // speedup
+        conf.set(ALIGNED_CHECKPOINT_TIMEOUT, Duration.ZERO); // prevent 
randomization
+        conf.set(BUFFER_DEBLOAT_ENABLED, false); // prevent randomization
+        conf.set(RESTART_STRATEGY, "none"); // not expecting any failures
+
+        return conf;
+    }
+
+    private String checkpointAndCancel(JobID jobID) throws Exception {
+        waitForCheckpoint(jobID, cluster.getMiniCluster(), 1);
+        cluster.getClusterClient().cancel(jobID).get();
+        checkStatus(jobID);
+        return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, 
cluster.getMiniCluster())
+                .<NoSuchElementException>orElseThrow(
+                        () -> {
+                            throw new NoSuchElementException("No checkpoint 
was created yet");
+                        });
+    }
+
+    private void checkStatus(JobID jobID) throws InterruptedException, 
ExecutionException {
+        if 
(cluster.getClusterClient().getJobStatus(jobID).get().isGloballyTerminalState())
 {
+            cluster.getClusterClient()
+                    .requestJobResult(jobID)
+                    .get()
+                    .getSerializedThrowable()
+                    .ifPresent(
+                            serializedThrowable -> {
+                                throw new 
RuntimeException(serializedThrowable);
+                            });
+        }
+    }
+
+    private static class OpenOnceFileSystem extends LocalFileSystem {
+        private final Set<Path> openedPaths = new HashSet<>();
+
+        @Override
+        public FSDataInputStream open(Path f) throws IOException {
+            Assert.assertTrue(f + " was already opened", openedPaths.add(f));
+            return super.open(f);
+        }
+
+        @Override
+        public boolean isDistributedFS() {
+            return true;
+        }
+
+        private boolean hasOpenedPaths() {
+            return !openedPaths.isEmpty();
+        }
+    }
+
+    private static void registerFileSystem(FileSystem fs, String scheme) {
+        FileSystem.initialize(
+                new Configuration(),
+                new TestingPluginManager(
+                        singletonMap(
+                                FileSystemFactory.class,
+                                Collections.singleton(
+                                                new FileSystemFactory() {
+                                                    @Override
+                                                    public FileSystem 
create(URI fsUri) {
+                                                        return fs;
+                                                    }
+
+                                                    @Override
+                                                    public String getScheme() {
+                                                        return scheme;
+                                                    }
+                                                })
+                                        .iterator())));
+    }
+}

Reply via email to