This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 72b0e9f2b0ab8478268a26845719791a7f25834c
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed Jan 12 12:30:05 2022 +0100

    [refactor] Make CheckpointStateOutputStream a top level class
---
 .../channel/ChannelStateCheckpointWriter.java      |  2 +-
 .../checkpoint/channel/ChannelStateWriterImpl.java |  7 +--
 .../state/CheckpointMetadataOutputStream.java      |  4 +-
 .../runtime/state/CheckpointStateOutputStream.java | 72 ++++++++++++++++++++++
 .../runtime/state/CheckpointStorageWorkerView.java |  3 +-
 .../runtime/state/CheckpointStreamFactory.java     | 50 ---------------
 .../state/CheckpointStreamWithResultProvider.java  | 19 +++---
 ...efaultOperatorStateBackendSnapshotStrategy.java |  2 +-
 .../state/DuplicatingCheckpointOutputStream.java   | 19 +++---
 .../runtime/state/FullSnapshotAsyncWriter.java     |  2 +-
 .../state/KeyedStateCheckpointOutputStream.java    |  3 +-
 .../state/NonClosingCheckpointOutputStream.java    |  7 +--
 .../state/OperatorStateCheckpointOutputStream.java |  4 +-
 .../runtime/state/SavepointSnapshotStrategy.java   |  2 +-
 .../state/StateSnapshotContextSynchronousImpl.java |  7 +--
 .../filesystem/FileBasedStateOutputStream.java     |  2 +-
 .../filesystem/FsCheckpointStorageAccess.java      |  2 +-
 .../filesystem/FsCheckpointStreamFactory.java      |  8 +--
 .../runtime/state/heap/HeapSnapshotStrategy.java   |  3 +-
 .../state/memory/MemCheckpointStreamFactory.java   |  1 +
 .../MemoryBackendCheckpointStorageAccess.java      |  2 +-
 .../channel/ChannelStateCheckpointWriterTest.java  |  2 +-
 .../runtime/state/ChannelPersistenceITCase.java    |  3 +-
 .../CheckpointStreamWithResultProviderTest.java    |  2 +-
 .../DuplicatingCheckpointOutputStreamTest.java     |  3 +-
 .../KeyedStateCheckpointOutputStreamTest.java      |  2 +-
 .../OperatorStateOutputCheckpointStreamTest.java   |  2 +-
 .../state/TestCheckpointStorageWorkerView.java     |  3 +-
 ...tingCheckpointStorageAccessCoordinatorView.java |  2 +-
 .../FsCheckpointStateOutputStreamTest.java         | 17 +++--
 .../filesystem/FsCheckpointStorageAccessTest.java  |  2 +-
 .../filesystem/FsStateBackendEntropyTest.java      |  2 +-
 .../memory/MemoryCheckpointOutputStreamTest.java   |  2 +-
 .../memory/MemoryCheckpointStorageAccessTest.java  |  2 +-
 .../state/testutils/BackendForTestStream.java      |  2 +-
 .../testutils/TestCheckpointStreamFactory.java     |  1 +
 .../state/ttl/mock/MockCheckpointStorage.java      |  4 +-
 .../util/BlockerCheckpointStreamFactory.java       |  1 +
 .../util/BlockingCheckpointOutputStream.java       | 10 ++-
 .../changelog/ChangelogKeyedStateBackend.java      | 27 +++++++-
 .../changelog/ChangelogStateBackendTestUtils.java  | 10 ++-
 .../streaming/state/RocksDBStateUploader.java      |  3 +-
 .../streaming/state/RocksDBAsyncSnapshotTest.java  |  2 +-
 .../streaming/state/RocksDBStateUploaderTest.java  | 32 ++++++++--
 .../state/NonCheckpointingStorageAccess.java       |  3 +-
 .../runtime/operators/GenericWriteAheadSink.java   |  4 +-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    |  4 +-
 .../StateSnapshotContextSynchronousImplTest.java   | 13 ++--
 .../tasks/TaskCheckpointingBehaviourTest.java      |  2 +-
 .../UnalignedCheckpointFailureHandlingITCase.java  |  4 +-
 50 files changed, 227 insertions(+), 160 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
index f84cf50..8b3a9f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
@@ -23,8 +23,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
 import org.apache.flink.runtime.state.AbstractChannelStateHandle;
 import 
org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 456bcab..ef8f58b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.checkpoint.channel;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 
@@ -39,9 +39,8 @@ import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteReque
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write;
 
 /**
- * {@link ChannelStateWriter} implemented using {@link
- * CheckpointStreamFactory.CheckpointStateOutputStream 
CheckpointStateOutputStreams}. Internally, it
- * has by default
+ * {@link ChannelStateWriter} implemented using {@link 
CheckpointStateOutputStream
+ * CheckpointStateOutputStreams}. Internally, it has by default
  *
  * <ul>
  *   <li>one stream per checkpoint; having multiple streams would mean more 
files written and more
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java
index 418107b..525e21b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java
@@ -25,8 +25,8 @@ import java.io.IOException;
 /**
  * An output stream for checkpoint metadata.
  *
- * <p>This stream is similar to the {@link 
CheckpointStreamFactory.CheckpointStateOutputStream}, but
- * for metadata files rather thancdata files.
+ * <p>This stream is similar to the {@link CheckpointStateOutputStream}, but 
for metadata files
+ * rather thancdata files.
  *
  * <p>This stream always creates a file, regardless of the amount of data 
written.
  */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java
new file mode 100644
index 0000000..4858235
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A dedicated output stream that produces a {@link StreamStateHandle} when 
closed.
+ *
+ * <p><b>Important:</b> When closing this stream after the successful case, 
you must call {@link
+ * #closeAndGetHandle()} - only that method will actually retain the resource 
written to. The method
+ * has the semantics of "close on success". The {@link #close()} method is 
supposed to remove the
+ * target resource if called before {@link #closeAndGetHandle()}, hence having 
the semantics of
+ * "close on failure". That way, simple try-with-resources statements 
automatically clean up
+ * unsuccessful partial state resources in case the writing does not complete.
+ *
+ * <p>Note: This is an abstract class and not an interface because {@link 
OutputStream} is an
+ * abstract class.
+ */
+@Internal
+public abstract class CheckpointStateOutputStream extends FSDataOutputStream {
+    /**
+     * Closes the stream and gets a state handle that can create an input 
stream producing the data
+     * written to this stream.
+     *
+     * <p>This closing must be called (also when the caller is not interested 
in the handle) to
+     * successfully close the stream and retain the produced resource. In 
contrast, the {@link
+     * #close()} method removes the target resource when called.
+     *
+     * @return A state handle that can create an input stream producing the 
data written to this
+     *     stream.
+     * @throws IOException Thrown, if the stream cannot be closed.
+     */
+    @Nullable
+    public abstract StreamStateHandle closeAndGetHandle() throws IOException;
+
+    /**
+     * This method should close the stream, if has not been closed before. If 
this method actually
+     * closes the stream, it should delete/release the resource behind the 
stream, such as the file
+     * that the stream writes to.
+     *
+     * <p>The above implies that this method is intended to be the 
"unsuccessful close", such as
+     * when cancelling the stream writing, or when an exception occurs. 
Closing the stream for the
+     * successful case must go through {@link #closeAndGetHandle()}.
+     *
+     * @throws IOException Thrown, if the stream cannot be closed.
+     */
+    @Override
+    public abstract void close() throws IOException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
index 064e42b..39d4a74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
@@ -63,6 +63,5 @@ public interface CheckpointStorageWorkerView {
      * @return A checkpoint state stream to the location for state owned by 
tasks.
      * @throws IOException Thrown, if the stream cannot be opened.
      */
-    CheckpointStreamFactory.CheckpointStateOutputStream 
createTaskOwnedStateStream()
-            throws IOException;
+    CheckpointStateOutputStream createTaskOwnedStateStream() throws 
IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
index b864711..c905438 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
@@ -18,12 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.fs.FSDataOutputStream;
-
-import javax.annotation.Nullable;
-
 import java.io.IOException;
-import java.io.OutputStream;
 
 /**
  * A factory for checkpoint output streams, which are used to persist data for 
checkpoints.
@@ -44,49 +39,4 @@ public interface CheckpointStreamFactory {
      */
     CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope)
             throws IOException;
-
-    /**
-     * A dedicated output stream that produces a {@link StreamStateHandle} 
when closed.
-     *
-     * <p><b>Important:</b> When closing this stream after the successful 
case, you must call {@link
-     * #closeAndGetHandle()} - only that method will actually retain the 
resource written to. The
-     * method has the semantics of "close on success". The {@link #close()} 
method is supposed to
-     * remove the target resource if called before {@link 
#closeAndGetHandle()}, hence having the
-     * semantics of "close on failure". That way, simple try-with-resources 
statements automatically
-     * clean up unsuccessful partial state resources in case the writing does 
not complete.
-     *
-     * <p>Note: This is an abstract class and not an interface because {@link 
OutputStream} is an
-     * abstract class.
-     */
-    abstract class CheckpointStateOutputStream extends FSDataOutputStream {
-
-        /**
-         * Closes the stream and gets a state handle that can create an input 
stream producing the
-         * data written to this stream.
-         *
-         * <p>This closing must be called (also when the caller is not 
interested in the handle) to
-         * successfully close the stream and retain the produced resource. In 
contrast, the {@link
-         * #close()} method removes the target resource when called.
-         *
-         * @return A state handle that can create an input stream producing 
the data written to this
-         *     stream.
-         * @throws IOException Thrown, if the stream cannot be closed.
-         */
-        @Nullable
-        public abstract StreamStateHandle closeAndGetHandle() throws 
IOException;
-
-        /**
-         * This method should close the stream, if has not been closed before. 
If this method
-         * actually closes the stream, it should delete/release the resource 
behind the stream, such
-         * as the file that the stream writes to.
-         *
-         * <p>The above implies that this method is intended to be the 
"unsuccessful close", such as
-         * when cancelling the stream writing, or when an exception occurs. 
Closing the stream for
-         * the successful case must go through {@link #closeAndGetHandle()}.
-         *
-         * @throws IOException Thrown, if the stream cannot be closed.
-         */
-        @Override
-        public abstract void close() throws IOException;
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
index cd7b181..3378ccb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
@@ -48,7 +48,7 @@ public interface CheckpointStreamWithResultProvider extends 
Closeable {
 
     /** Returns the encapsulated output stream. */
     @Nonnull
-    CheckpointStreamFactory.CheckpointStateOutputStream 
getCheckpointOutputStream();
+    CheckpointStateOutputStream getCheckpointOutputStream();
 
     @Override
     default void close() throws IOException {
@@ -61,10 +61,9 @@ public interface CheckpointStreamWithResultProvider extends 
Closeable {
      */
     class PrimaryStreamOnly implements CheckpointStreamWithResultProvider {
 
-        @Nonnull private final 
CheckpointStreamFactory.CheckpointStateOutputStream outputStream;
+        @Nonnull private final CheckpointStateOutputStream outputStream;
 
-        public PrimaryStreamOnly(
-                @Nonnull CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream) {
+        public PrimaryStreamOnly(@Nonnull CheckpointStateOutputStream 
outputStream) {
             this.outputStream = outputStream;
         }
 
@@ -77,7 +76,7 @@ public interface CheckpointStreamWithResultProvider extends 
Closeable {
 
         @Nonnull
         @Override
-        public CheckpointStreamFactory.CheckpointStateOutputStream 
getCheckpointOutputStream() {
+        public CheckpointStateOutputStream getCheckpointOutputStream() {
             return outputStream;
         }
     }
@@ -93,8 +92,8 @@ public interface CheckpointStreamWithResultProvider extends 
Closeable {
         @Nonnull private final DuplicatingCheckpointOutputStream outputStream;
 
         public PrimaryAndSecondaryStream(
-                @Nonnull CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOut,
-                CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOut)
+                @Nonnull CheckpointStateOutputStream primaryOut,
+                CheckpointStateOutputStream secondaryOut)
                 throws IOException {
             this(new DuplicatingCheckpointOutputStream(primaryOut, 
secondaryOut));
         }
@@ -154,7 +153,7 @@ public interface CheckpointStreamWithResultProvider extends 
Closeable {
             @Nonnull CheckpointStreamFactory primaryStreamFactory)
             throws IOException {
 
-        CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
+        CheckpointStateOutputStream primaryOut =
                 
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
 
         return new 
CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
@@ -168,7 +167,7 @@ public interface CheckpointStreamWithResultProvider extends 
Closeable {
             @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider)
             throws IOException {
 
-        CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
+        CheckpointStateOutputStream primaryOut =
                 
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
 
         try {
@@ -179,7 +178,7 @@ public interface CheckpointStreamWithResultProvider extends 
Closeable {
                             String.valueOf(UUID.randomUUID()));
             Path outPath = new Path(outFile.toURI());
 
-            CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
+            CheckpointStateOutputStream secondaryOut =
                     new FileBasedStateOutputStream(outPath.getFileSystem(), 
outPath);
 
             return new 
CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
index 6055836..6dc6739 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
@@ -117,7 +117,7 @@ class DefaultOperatorStateBackendSnapshotStrategy
         }
 
         return (snapshotCloseableRegistry) -> {
-            CheckpointStreamFactory.CheckpointStateOutputStream localOut =
+            CheckpointStateOutputStream localOut =
                     streamFactory.createCheckpointStateOutputStream(
                             CheckpointedStateScope.EXCLUSIVE);
             snapshotCloseableRegistry.registerCloseable(localOut);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
index 5535aeb..fd9e7e32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
@@ -35,8 +35,7 @@ import java.io.IOException;
  * user. This class is used to write state for local recovery as a local 
(secondary) copy of the
  * (primary) state snapshot that is written to a (slower but highly-available) 
remote filesystem.
  */
-public class DuplicatingCheckpointOutputStream
-        extends CheckpointStreamFactory.CheckpointStateOutputStream {
+public class DuplicatingCheckpointOutputStream extends 
CheckpointStateOutputStream {
 
     /** Default buffer size of 8KB. */
     private static final int DEFAULT_BUFFER_SIZER = 8 * 1024;
@@ -48,13 +47,13 @@ public class DuplicatingCheckpointOutputStream
     private int bufferIdx;
 
     /** Primary stream for writing the checkpoint data. Failures from this 
stream are forwarded. */
-    private final CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream;
+    private final CheckpointStateOutputStream primaryOutputStream;
 
     /**
      * Primary stream for writing the checkpoint data. Failures from this 
stream are not forwarded
      * until {@link #closeAndGetSecondaryHandle()}.
      */
-    private final CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream;
+    private final CheckpointStateOutputStream secondaryOutputStream;
 
     /**
      * Stores a potential exception that occurred while interacting with {@link
@@ -63,15 +62,15 @@ public class DuplicatingCheckpointOutputStream
     private Exception secondaryStreamException;
 
     public DuplicatingCheckpointOutputStream(
-            CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
-            CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream)
+            CheckpointStateOutputStream primaryOutputStream,
+            CheckpointStateOutputStream secondaryOutputStream)
             throws IOException {
         this(primaryOutputStream, secondaryOutputStream, DEFAULT_BUFFER_SIZER);
     }
 
     public DuplicatingCheckpointOutputStream(
-            CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
-            CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream,
+            CheckpointStateOutputStream primaryOutputStream,
+            CheckpointStateOutputStream secondaryOutputStream,
             int bufferSize)
             throws IOException {
 
@@ -280,12 +279,12 @@ public class DuplicatingCheckpointOutputStream
     }
 
     @VisibleForTesting
-    CheckpointStreamFactory.CheckpointStateOutputStream 
getPrimaryOutputStream() {
+    CheckpointStateOutputStream getPrimaryOutputStream() {
         return primaryOutputStream;
     }
 
     @VisibleForTesting
-    CheckpointStreamFactory.CheckpointStateOutputStream 
getSecondaryOutputStream() {
+    CheckpointStateOutputStream getSecondaryOutputStream() {
         return secondaryOutputStream;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
index c5a7277..96a3bd4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
@@ -136,7 +136,7 @@ public class FullSnapshotAsyncWriter<K>
         byte[] previousValue = null;
         DataOutputView kgOutView = null;
         OutputStream kgOutStream = null;
-        CheckpointStreamFactory.CheckpointStateOutputStream 
checkpointOutputStream =
+        CheckpointStateOutputStream checkpointOutputStream =
                 checkpointStreamWithResultProvider.getCheckpointOutputStream();
 
         try {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java
index 7384fbe..a487e4e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java
@@ -39,8 +39,7 @@ public final class KeyedStateCheckpointOutputStream
     private final KeyGroupRangeOffsets keyGroupRangeOffsets;
 
     public KeyedStateCheckpointOutputStream(
-            CheckpointStreamFactory.CheckpointStateOutputStream delegate,
-            KeyGroupRange keyGroupRange) {
+            CheckpointStateOutputStream delegate, KeyGroupRange keyGroupRange) 
{
 
         super(delegate);
         Preconditions.checkNotNull(keyGroupRange);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java
index d39b198..76530b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java
@@ -34,11 +34,10 @@ import java.io.OutputStream;
 public abstract class NonClosingCheckpointOutputStream<T extends 
StreamStateHandle>
         extends OutputStream {
 
-    protected final CheckpointStreamFactory.CheckpointStateOutputStream 
delegate;
+    protected final CheckpointStateOutputStream delegate;
     private final ResourceGuard resourceGuard = new ResourceGuard();
 
-    public NonClosingCheckpointOutputStream(
-            CheckpointStreamFactory.CheckpointStateOutputStream delegate) {
+    public NonClosingCheckpointOutputStream(CheckpointStateOutputStream 
delegate) {
         this.delegate = Preconditions.checkNotNull(delegate);
     }
 
@@ -79,7 +78,7 @@ public abstract class NonClosingCheckpointOutputStream<T 
extends StreamStateHand
     }
 
     /** This method should not be public so as to not expose internals to user 
code. */
-    CheckpointStreamFactory.CheckpointStateOutputStream getDelegate() {
+    CheckpointStateOutputStream getDelegate() {
         return delegate;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
index 34d700a..5f46187 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
@@ -33,8 +33,8 @@ public final class OperatorStateCheckpointOutputStream
     private LongArrayList partitionOffsets;
     private final long initialPosition;
 
-    public OperatorStateCheckpointOutputStream(
-            CheckpointStreamFactory.CheckpointStateOutputStream delegate) 
throws IOException {
+    public OperatorStateCheckpointOutputStream(CheckpointStateOutputStream 
delegate)
+            throws IOException {
 
         super(delegate);
         this.partitionOffsets = new LongArrayList(16);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java
index 126150b..114bf97 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java
@@ -85,7 +85,7 @@ public class SavepointSnapshotStrategy<K>
     static CheckpointStreamWithResultProvider createSimpleStream(
             @Nonnull CheckpointStreamFactory primaryStreamFactory) throws 
IOException {
 
-        CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
+        CheckpointStateOutputStream primaryOut =
                 primaryStreamFactory.createCheckpointStateOutputStream(
                         CheckpointedStateScope.EXCLUSIVE);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index a6b281b..4df7015 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -94,9 +94,8 @@ public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext
         return checkpointTimestamp;
     }
 
-    private CheckpointStreamFactory.CheckpointStateOutputStream 
openAndRegisterNewStream()
-            throws Exception {
-        CheckpointStreamFactory.CheckpointStateOutputStream cout =
+    private CheckpointStateOutputStream openAndRegisterNewStream() throws 
Exception {
+        CheckpointStateOutputStream cout =
                 
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
 
         closableRegistry.registerCloseable(cout);
@@ -160,7 +159,7 @@ public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext
 
         Preconditions.checkNotNull(stream);
 
-        CheckpointStreamFactory.CheckpointStateOutputStream delegate = 
stream.getDelegate();
+        CheckpointStateOutputStream delegate = stream.getDelegate();
 
         if (closableRegistry.unregisterCloseable(delegate)) {
             delegate.close();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
index 7021ea50..12f0ba0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index a33c47b..1c3ebb2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -22,10 +22,10 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 
 import javax.annotation.Nullable;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 8069bdd..5151130 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.OutputStreamAndPath;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -161,11 +162,10 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
     // ------------------------------------------------------------------------
 
     /**
-     * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that 
writes into a file and
-     * returns a {@link StreamStateHandle} upon closing.
+     * A {@link CheckpointStateOutputStream} that writes into a file and 
returns a {@link
+     * StreamStateHandle} upon closing.
      */
-    public static class FsCheckpointStateOutputStream
-            extends CheckpointStreamFactory.CheckpointStateOutputStream {
+    public static class FsCheckpointStateOutputStream extends 
CheckpointStateOutputStream {
 
         private final byte[] writeBuffer;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index 78b96e5..8b2bc3e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -140,7 +141,7 @@ class HeapSnapshotStrategy<K>
 
             
snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);
 
-            final CheckpointStreamFactory.CheckpointStateOutputStream 
localStream =
+            final CheckpointStateOutputStream localStream =
                     streamWithResultProvider.getCheckpointOutputStream();
 
             final DataOutputViewStreamWrapper outView =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 5f36a42..98d09be 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.memory;
 
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
index 5ff029d..10e33a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
@@ -22,10 +22,10 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
 import 
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
index 28b81d7..29cd71f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelSta
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index 71fbdfd..8363e2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -268,8 +268,7 @@ public class ChannelPersistenceITCase {
             }
 
             @Override
-            public CheckpointStreamFactory.CheckpointStateOutputStream
-                    createTaskOwnedStateStream() {
+            public CheckpointStateOutputStream createTaskOwnedStateStream() {
                 throw new UnsupportedOperationException();
             }
         };
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
index 915c9bf..4e928c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
@@ -183,7 +183,7 @@ public class CheckpointStreamWithResultProviderTest extends 
TestLogger {
     private SnapshotResult<StreamStateHandle> writeCheckpointTestData(
             CheckpointStreamWithResultProvider resultProvider) throws 
IOException {
 
-        CheckpointStreamFactory.CheckpointStateOutputStream 
checkpointOutputStream =
+        CheckpointStateOutputStream checkpointOutputStream =
                 resultProvider.getCheckpointOutputStream();
         checkpointOutputStream.write(0x42);
         return resultProvider.closeAndFinalizeCheckpointStreamResult();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java
index 9fa30de..0b09655e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java
@@ -285,8 +285,7 @@ public class DuplicatingCheckpointOutputStreamTest extends 
TestLogger {
     }
 
     /** Stream that throws {@link IOException} on all relevant methods under 
test. */
-    private static class FailingCheckpointOutStream
-            extends CheckpointStreamFactory.CheckpointStateOutputStream {
+    private static class FailingCheckpointOutStream extends 
CheckpointStateOutputStream {
 
         private boolean closed = false;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java
index a6114ba..31f46f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java
@@ -33,7 +33,7 @@ public class KeyedStateCheckpointOutputStreamTest {
     private static final int STREAM_CAPACITY = 128;
 
     private static KeyedStateCheckpointOutputStream createStream(KeyGroupRange 
keyGroupRange) {
-        CheckpointStreamFactory.CheckpointStateOutputStream checkStream =
+        CheckpointStateOutputStream checkStream =
                 new TestMemoryCheckpointOutputStream(STREAM_CAPACITY);
         return new KeyedStateCheckpointOutputStream(checkStream, 
keyGroupRange);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java
index d92c526..80af8e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java
@@ -35,7 +35,7 @@ public class OperatorStateOutputCheckpointStreamTest {
     private static final int STREAM_CAPACITY = 128;
 
     private static OperatorStateCheckpointOutputStream createStream() throws 
IOException {
-        CheckpointStreamFactory.CheckpointStateOutputStream checkStream =
+        CheckpointStateOutputStream checkStream =
                 new TestMemoryCheckpointOutputStream(STREAM_CAPACITY);
         return new OperatorStateCheckpointOutputStream(checkStream);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
index 95b0248..9b8d298 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
@@ -49,8 +49,7 @@ public class TestCheckpointStorageWorkerView implements 
CheckpointStorageWorkerV
     }
 
     @Override
-    public CheckpointStreamFactory.CheckpointStateOutputStream 
createTaskOwnedStateStream()
-            throws IOException {
+    public CheckpointStateOutputStream createTaskOwnedStateStream() throws 
IOException {
         return 
taskOwnedCheckpointStreamFactory.createCheckpointStateOutputStream(
                 taskOwnedStateScope);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
index ff674b9..9e05949 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
@@ -93,7 +93,7 @@ public class TestingCheckpointStorageAccessCoordinatorView
     }
 
     @Override
-    public CheckpointStreamFactory.CheckpointStateOutputStream 
createTaskOwnedStateStream() {
+    public CheckpointStateOutputStream createTaskOwnedStateStream() {
         return new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(Integer.MAX_VALUE);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index bf2c264..472c35c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -22,8 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -90,7 +89,7 @@ public class FsCheckpointStateOutputStreamTest {
 
     @Test
     public void testEmptyState() throws Exception {
-        FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
+        CheckpointStateOutputStream stream =
                 new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
                         Path.fromLocalFile(tempDir.newFolder()),
                         FileSystem.getLocalFileSystem(),
@@ -124,7 +123,7 @@ public class FsCheckpointStateOutputStreamTest {
 
     @Test
     public void testGetPos() throws Exception {
-        FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
+        CheckpointStateOutputStream stream =
                 new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
                         Path.fromLocalFile(tempDir.newFolder()),
                         FileSystem.getLocalFileSystem(),
@@ -172,7 +171,7 @@ public class FsCheckpointStateOutputStreamTest {
         when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class)))
                 .thenReturn(outputStream);
 
-        CheckpointStreamFactory.CheckpointStateOutputStream stream =
+        CheckpointStateOutputStream stream =
                 new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
                         Path.fromLocalFile(tempDir.newFolder()), fs, 4, 0, 
relativePaths);
 
@@ -198,7 +197,7 @@ public class FsCheckpointStateOutputStreamTest {
                 .thenReturn(outputStream);
         doThrow(new IOException("Test 
IOException.")).when(outputStream).close();
 
-        CheckpointStreamFactory.CheckpointStateOutputStream stream =
+        CheckpointStateOutputStream stream =
                 new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
                         Path.fromLocalFile(tempDir.newFolder()), fs, 4, 0, 
relativePaths);
 
@@ -219,7 +218,7 @@ public class FsCheckpointStateOutputStreamTest {
 
     private void runTest(int numBytes, int bufferSize, int threshold, boolean 
expectFile)
             throws Exception {
-        FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
+        CheckpointStateOutputStream stream =
                 new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
                         Path.fromLocalFile(tempDir.newFolder()),
                         FileSystem.getLocalFileSystem(),
@@ -335,13 +334,13 @@ public class FsCheckpointStateOutputStreamTest {
 
         // use with try-with-resources
         StreamStateHandle handle4;
-        try (CheckpointStreamFactory.CheckpointStateOutputStream stream4 = 
factory.get()) {
+        try (CheckpointStateOutputStream stream4 = factory.get()) {
             stream4.write(state4);
             handle4 = stream4.closeAndGetHandle();
         }
 
         // close before accessing handle
-        CheckpointStreamFactory.CheckpointStateOutputStream stream5 = 
factory.get();
+        CheckpointStateOutputStream stream5 = factory.get();
         stream5.write(state4);
         stream5.close();
         try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
index 77cb9b6..7b24f3d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
index a31a09c..8a4cf38 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 
 import org.junit.Rule;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java
index 07d43cd..63dc7a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.state.memory;
 
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import 
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java
index aef780d..bebe5da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.state.memory;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageAccessTestBase;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
index 9c57d0f..0331d5b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.state.testutils;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
index a4e1efb..80f36b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.testutils;
 
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
index a71d9bc..fb4a8a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.ttl.mock;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -95,8 +96,7 @@ public class MockCheckpointStorage implements 
CheckpointStorage {
             }
 
             @Override
-            public CheckpointStreamFactory.CheckpointStateOutputStream
-                    createTaskOwnedStateStream() {
+            public CheckpointStateOutputStream createTaskOwnedStateStream() {
                 return null;
             }
         };
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 66e8b25..021e340 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
index 00aac67..e6217c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.util;
 
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.IOUtils;
 
@@ -35,8 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * until the stream is closed. This is typically used to test that a blocking 
read can be
  * interrupted / closed.
  */
-public class BlockingCheckpointOutputStream
-        extends CheckpointStreamFactory.CheckpointStateOutputStream {
+public class BlockingCheckpointOutputStream extends 
CheckpointStateOutputStream {
 
     /** Optional delegate stream to which all ops are forwarded. */
     private final FSDataOutputStream delegate;
@@ -192,10 +191,9 @@ public class BlockingCheckpointOutputStream
             throw new IOException("Stream was already closed!");
         }
 
-        if (delegate instanceof 
CheckpointStreamFactory.CheckpointStateOutputStream) {
+        if (delegate instanceof CheckpointStateOutputStream) {
             StreamStateHandle streamStateHandle =
-                    ((CheckpointStreamFactory.CheckpointStateOutputStream) 
delegate)
-                            .closeAndGetHandle();
+                    ((CheckpointStateOutputStream) 
delegate).closeAndGetHandle();
             unblockAll();
             return streamStateHandle;
         } else {
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index cf08092..9fc4f56 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -29,10 +29,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.Keyed;
@@ -45,6 +47,7 @@ import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInf
 import org.apache.flink.runtime.state.SavepointResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestableKeyedStateBackend;
 import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
 import 
org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
@@ -217,7 +220,29 @@ public class ChangelogKeyedStateBackend<K>
         this.stateChangelogWriter = stateChangelogWriter;
         this.changelogStates = new HashMap<>();
         this.changelogSnapshotState = completeRestore(initialState);
-        this.streamFactory = shared -> 
checkpointStorageWorkerView.createTaskOwnedStateStream();
+        this.streamFactory =
+                new CheckpointStreamFactory() {
+
+                    @Override
+                    public CheckpointStateOutputStream 
createCheckpointStateOutputStream(
+                            CheckpointedStateScope scope) throws IOException {
+                        return 
checkpointStorageWorkerView.createTaskOwnedStateStream();
+                    }
+
+                    @Override
+                    public boolean canFastDuplicate(
+                            StreamStateHandle stateHandle, 
CheckpointedStateScope scope)
+                            throws IOException {
+                        return false;
+                    }
+
+                    @Override
+                    public List<StreamStateHandle> duplicate(
+                            List<StreamStateHandle> stateHandles, 
CheckpointedStateScope scope)
+                            throws IOException {
+                        return null;
+                    }
+                };
         this.closer.register(keyedStateBackend);
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
index 4672c7c..bed4773 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -398,7 +400,13 @@ public class ChangelogStateBackendTestUtils {
         }
 
         @Override
-        public CheckpointStreamFactory.CheckpointStateOutputStream 
createTaskOwnedStateStream() {
+        public CheckpointStateOutputStream createTaskOwnedStateStream() {
+            throw new UnsupportedOperationException(
+                    "Checkpoints are not supported in a single key state 
backend");
+        }
+
+        @Override
+        public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
             throw new UnsupportedOperationException(
                     "Checkpoints are not supported in a single key state 
backend");
         }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
index 09b4ea0..68d1ed9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -115,7 +116,7 @@ public class RocksDBStateUploader extends 
RocksDBStateDataTransfer {
             throws IOException {
 
         InputStream inputStream = null;
-        CheckpointStreamFactory.CheckpointStateOutputStream outputStream = 
null;
+        CheckpointStateOutputStream outputStream = null;
 
         try {
             final byte[] buffer = new byte[READ_BUFFER_SIZE];
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 6ea1da5..f61394b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -40,8 +40,8 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
index 8ef843a..434976d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -42,6 +43,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -59,10 +61,30 @@ public class RocksDBStateUploaderTest extends TestLogger {
         SpecifiedException expectedException =
                 new SpecifiedException("throw exception while multi thread 
upload states.");
 
-        CheckpointStreamFactory.CheckpointStateOutputStream outputStream =
+        CheckpointStateOutputStream outputStream =
                 createFailingCheckpointStateOutputStream(expectedException);
         CheckpointStreamFactory checkpointStreamFactory =
-                (CheckpointedStateScope scope) -> outputStream;
+                new CheckpointStreamFactory() {
+                    @Override
+                    public CheckpointStateOutputStream 
createCheckpointStateOutputStream(
+                            CheckpointedStateScope scope) throws IOException {
+                        return outputStream;
+                    }
+
+                    @Override
+                    public boolean canFastDuplicate(
+                            StreamStateHandle stateHandle, 
CheckpointedStateScope scope)
+                            throws IOException {
+                        return false;
+                    }
+
+                    @Override
+                    public List<StreamStateHandle> duplicate(
+                            List<StreamStateHandle> stateHandles, 
CheckpointedStateScope scope)
+                            throws IOException {
+                        return null;
+                    }
+                };
 
         File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
         generateRandomFileContent(file.getPath(), 20);
@@ -119,9 +141,9 @@ public class RocksDBStateUploaderTest extends TestLogger {
         }
     }
 
-    private CheckpointStreamFactory.CheckpointStateOutputStream
-            createFailingCheckpointStateOutputStream(IOException 
failureException) {
-        return new CheckpointStreamFactory.CheckpointStateOutputStream() {
+    private CheckpointStateOutputStream 
createFailingCheckpointStateOutputStream(
+            IOException failureException) {
+        return new CheckpointStateOutputStream() {
             @Nullable
             @Override
             public StreamStateHandle closeAndGetHandle() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
index 0de86e1..3422850 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.sorted.state;
 
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -68,7 +69,7 @@ class NonCheckpointingStorageAccess implements 
CheckpointStorageAccess {
     }
 
     @Override
-    public CheckpointStreamFactory.CheckpointStateOutputStream 
createTaskOwnedStateStream() {
+    public CheckpointStateOutputStream createTaskOwnedStateStream() {
         throw new UnsupportedOperationException(
                 "Checkpoints are not supported in a single key state backend");
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 4627c7d..a93c35d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -25,8 +25,8 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -69,7 +69,7 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
     private final CheckpointCommitter committer;
     protected final TypeSerializer<IN> serializer;
 
-    private transient CheckpointStreamFactory.CheckpointStateOutputStream out;
+    private transient CheckpointStateOutputStream out;
     private transient CheckpointStorageWorkerView checkpointStorage;
 
     private transient ListState<PendingCheckpoint> checkpointedState;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index c8cc362..bcb4bdd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -713,8 +714,7 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         }
 
         @Override
-        public CheckpointStreamFactory.CheckpointStateOutputStream 
createTaskOwnedStateStream()
-                throws IOException {
+        public CheckpointStateOutputStream createTaskOwnedStateStream() throws 
IOException {
             return delegate.createTaskOwnedStateStream();
         }
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 9a7f04b..7dd7f32 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -83,10 +84,8 @@ public class StateSnapshotContextSynchronousImplTest extends 
TestLogger {
         long checkpointId = 42L;
         long checkpointTimestamp = 1L;
 
-        CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 =
-                
mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-        CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 =
-                
mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+        CheckpointStateOutputStream outputStream1 = 
mock(CheckpointStateOutputStream.class);
+        CheckpointStateOutputStream outputStream2 = 
mock(CheckpointStateOutputStream.class);
 
         CheckpointStreamFactory streamFactory = 
mock(CheckpointStreamFactory.class);
         
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE))
@@ -129,10 +128,8 @@ public class StateSnapshotContextSynchronousImplTest 
extends TestLogger {
         long checkpointId = 42L;
         long checkpointTimestamp = 1L;
 
-        CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 =
-                
mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-        CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 =
-                
mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+        CheckpointStateOutputStream outputStream1 = 
mock(CheckpointStateOutputStream.class);
+        CheckpointStateOutputStream outputStream2 = 
mock(CheckpointStateOutputStream.class);
 
         CheckpointStreamFactory streamFactory = 
mock(CheckpointStreamFactory.class);
         
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE))
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 598bf52..517d632 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -55,8 +55,8 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
 import org.apache.flink.runtime.state.OperatorStateBackend;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
index 15252c7..ef7fc8a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -227,8 +228,7 @@ public class UnalignedCheckpointFailureHandlingITCase {
         }
 
         @Override
-        public CheckpointStreamFactory.CheckpointStateOutputStream 
createTaskOwnedStateStream()
-                throws IOException {
+        public CheckpointStateOutputStream createTaskOwnedStateStream() throws 
IOException {
             return delegate.createTaskOwnedStateStream();
         }
 

Reply via email to