This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 72b0e9f2b0ab8478268a26845719791a7f25834c Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Wed Jan 12 12:30:05 2022 +0100 [refactor] Make CheckpointStateOutputStream a top level class --- .../channel/ChannelStateCheckpointWriter.java | 2 +- .../checkpoint/channel/ChannelStateWriterImpl.java | 7 +-- .../state/CheckpointMetadataOutputStream.java | 4 +- .../runtime/state/CheckpointStateOutputStream.java | 72 ++++++++++++++++++++++ .../runtime/state/CheckpointStorageWorkerView.java | 3 +- .../runtime/state/CheckpointStreamFactory.java | 50 --------------- .../state/CheckpointStreamWithResultProvider.java | 19 +++--- ...efaultOperatorStateBackendSnapshotStrategy.java | 2 +- .../state/DuplicatingCheckpointOutputStream.java | 19 +++--- .../runtime/state/FullSnapshotAsyncWriter.java | 2 +- .../state/KeyedStateCheckpointOutputStream.java | 3 +- .../state/NonClosingCheckpointOutputStream.java | 7 +-- .../state/OperatorStateCheckpointOutputStream.java | 4 +- .../runtime/state/SavepointSnapshotStrategy.java | 2 +- .../state/StateSnapshotContextSynchronousImpl.java | 7 +-- .../filesystem/FileBasedStateOutputStream.java | 2 +- .../filesystem/FsCheckpointStorageAccess.java | 2 +- .../filesystem/FsCheckpointStreamFactory.java | 8 +-- .../runtime/state/heap/HeapSnapshotStrategy.java | 3 +- .../state/memory/MemCheckpointStreamFactory.java | 1 + .../MemoryBackendCheckpointStorageAccess.java | 2 +- .../channel/ChannelStateCheckpointWriterTest.java | 2 +- .../runtime/state/ChannelPersistenceITCase.java | 3 +- .../CheckpointStreamWithResultProviderTest.java | 2 +- .../DuplicatingCheckpointOutputStreamTest.java | 3 +- .../KeyedStateCheckpointOutputStreamTest.java | 2 +- .../OperatorStateOutputCheckpointStreamTest.java | 2 +- .../state/TestCheckpointStorageWorkerView.java | 3 +- ...tingCheckpointStorageAccessCoordinatorView.java | 2 +- .../FsCheckpointStateOutputStreamTest.java | 17 +++-- .../filesystem/FsCheckpointStorageAccessTest.java | 2 +- .../filesystem/FsStateBackendEntropyTest.java | 2 +- .../memory/MemoryCheckpointOutputStreamTest.java | 2 +- .../memory/MemoryCheckpointStorageAccessTest.java | 2 +- .../state/testutils/BackendForTestStream.java | 2 +- .../testutils/TestCheckpointStreamFactory.java | 1 + .../state/ttl/mock/MockCheckpointStorage.java | 4 +- .../util/BlockerCheckpointStreamFactory.java | 1 + .../util/BlockingCheckpointOutputStream.java | 10 ++- .../changelog/ChangelogKeyedStateBackend.java | 27 +++++++- .../changelog/ChangelogStateBackendTestUtils.java | 10 ++- .../streaming/state/RocksDBStateUploader.java | 3 +- .../streaming/state/RocksDBAsyncSnapshotTest.java | 2 +- .../streaming/state/RocksDBStateUploaderTest.java | 32 ++++++++-- .../state/NonCheckpointingStorageAccess.java | 3 +- .../runtime/operators/GenericWriteAheadSink.java | 4 +- .../tasks/SubtaskCheckpointCoordinatorImpl.java | 4 +- .../StateSnapshotContextSynchronousImplTest.java | 13 ++-- .../tasks/TaskCheckpointingBehaviourTest.java | 2 +- .../UnalignedCheckpointFailureHandlingITCase.java | 4 +- 50 files changed, 227 insertions(+), 160 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java index f84cf50..8b3a9f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java @@ -23,8 +23,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger; import org.apache.flink.runtime.state.AbstractChannelStateHandle; import org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; import org.apache.flink.runtime.state.InputChannelStateHandle; import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 456bcab..ef8f58b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -20,8 +20,8 @@ package org.apache.flink.runtime.checkpoint.channel; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; -import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Preconditions; @@ -39,9 +39,8 @@ import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteReque import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write; /** - * {@link ChannelStateWriter} implemented using {@link - * CheckpointStreamFactory.CheckpointStateOutputStream CheckpointStateOutputStreams}. Internally, it - * has by default + * {@link ChannelStateWriter} implemented using {@link CheckpointStateOutputStream + * CheckpointStateOutputStreams}. Internally, it has by default * * <ul> * <li>one stream per checkpoint; having multiple streams would mean more files written and more diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java index 418107b..525e21b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java @@ -25,8 +25,8 @@ import java.io.IOException; /** * An output stream for checkpoint metadata. * - * <p>This stream is similar to the {@link CheckpointStreamFactory.CheckpointStateOutputStream}, but - * for metadata files rather thancdata files. + * <p>This stream is similar to the {@link CheckpointStateOutputStream}, but for metadata files + * rather thancdata files. * * <p>This stream always creates a file, regardless of the amount of data written. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java new file mode 100644 index 0000000..4858235 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.FSDataOutputStream; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * A dedicated output stream that produces a {@link StreamStateHandle} when closed. + * + * <p><b>Important:</b> When closing this stream after the successful case, you must call {@link + * #closeAndGetHandle()} - only that method will actually retain the resource written to. The method + * has the semantics of "close on success". The {@link #close()} method is supposed to remove the + * target resource if called before {@link #closeAndGetHandle()}, hence having the semantics of + * "close on failure". That way, simple try-with-resources statements automatically clean up + * unsuccessful partial state resources in case the writing does not complete. + * + * <p>Note: This is an abstract class and not an interface because {@link OutputStream} is an + * abstract class. + */ +@Internal +public abstract class CheckpointStateOutputStream extends FSDataOutputStream { + /** + * Closes the stream and gets a state handle that can create an input stream producing the data + * written to this stream. + * + * <p>This closing must be called (also when the caller is not interested in the handle) to + * successfully close the stream and retain the produced resource. In contrast, the {@link + * #close()} method removes the target resource when called. + * + * @return A state handle that can create an input stream producing the data written to this + * stream. + * @throws IOException Thrown, if the stream cannot be closed. + */ + @Nullable + public abstract StreamStateHandle closeAndGetHandle() throws IOException; + + /** + * This method should close the stream, if has not been closed before. If this method actually + * closes the stream, it should delete/release the resource behind the stream, such as the file + * that the stream writes to. + * + * <p>The above implies that this method is intended to be the "unsuccessful close", such as + * when cancelling the stream writing, or when an exception occurs. Closing the stream for the + * successful case must go through {@link #closeAndGetHandle()}. + * + * @throws IOException Thrown, if the stream cannot be closed. + */ + @Override + public abstract void close() throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java index 064e42b..39d4a74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java @@ -63,6 +63,5 @@ public interface CheckpointStorageWorkerView { * @return A checkpoint state stream to the location for state owned by tasks. * @throws IOException Thrown, if the stream cannot be opened. */ - CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() - throws IOException; + CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java index b864711..c905438 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java @@ -18,12 +18,7 @@ package org.apache.flink.runtime.state; -import org.apache.flink.core.fs.FSDataOutputStream; - -import javax.annotation.Nullable; - import java.io.IOException; -import java.io.OutputStream; /** * A factory for checkpoint output streams, which are used to persist data for checkpoints. @@ -44,49 +39,4 @@ public interface CheckpointStreamFactory { */ CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; - - /** - * A dedicated output stream that produces a {@link StreamStateHandle} when closed. - * - * <p><b>Important:</b> When closing this stream after the successful case, you must call {@link - * #closeAndGetHandle()} - only that method will actually retain the resource written to. The - * method has the semantics of "close on success". The {@link #close()} method is supposed to - * remove the target resource if called before {@link #closeAndGetHandle()}, hence having the - * semantics of "close on failure". That way, simple try-with-resources statements automatically - * clean up unsuccessful partial state resources in case the writing does not complete. - * - * <p>Note: This is an abstract class and not an interface because {@link OutputStream} is an - * abstract class. - */ - abstract class CheckpointStateOutputStream extends FSDataOutputStream { - - /** - * Closes the stream and gets a state handle that can create an input stream producing the - * data written to this stream. - * - * <p>This closing must be called (also when the caller is not interested in the handle) to - * successfully close the stream and retain the produced resource. In contrast, the {@link - * #close()} method removes the target resource when called. - * - * @return A state handle that can create an input stream producing the data written to this - * stream. - * @throws IOException Thrown, if the stream cannot be closed. - */ - @Nullable - public abstract StreamStateHandle closeAndGetHandle() throws IOException; - - /** - * This method should close the stream, if has not been closed before. If this method - * actually closes the stream, it should delete/release the resource behind the stream, such - * as the file that the stream writes to. - * - * <p>The above implies that this method is intended to be the "unsuccessful close", such as - * when cancelling the stream writing, or when an exception occurs. Closing the stream for - * the successful case must go through {@link #closeAndGetHandle()}. - * - * @throws IOException Thrown, if the stream cannot be closed. - */ - @Override - public abstract void close() throws IOException; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java index cd7b181..3378ccb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java @@ -48,7 +48,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable { /** Returns the encapsulated output stream. */ @Nonnull - CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream(); + CheckpointStateOutputStream getCheckpointOutputStream(); @Override default void close() throws IOException { @@ -61,10 +61,9 @@ public interface CheckpointStreamWithResultProvider extends Closeable { */ class PrimaryStreamOnly implements CheckpointStreamWithResultProvider { - @Nonnull private final CheckpointStreamFactory.CheckpointStateOutputStream outputStream; + @Nonnull private final CheckpointStateOutputStream outputStream; - public PrimaryStreamOnly( - @Nonnull CheckpointStreamFactory.CheckpointStateOutputStream outputStream) { + public PrimaryStreamOnly(@Nonnull CheckpointStateOutputStream outputStream) { this.outputStream = outputStream; } @@ -77,7 +76,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable { @Nonnull @Override - public CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream() { + public CheckpointStateOutputStream getCheckpointOutputStream() { return outputStream; } } @@ -93,8 +92,8 @@ public interface CheckpointStreamWithResultProvider extends Closeable { @Nonnull private final DuplicatingCheckpointOutputStream outputStream; public PrimaryAndSecondaryStream( - @Nonnull CheckpointStreamFactory.CheckpointStateOutputStream primaryOut, - CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut) + @Nonnull CheckpointStateOutputStream primaryOut, + CheckpointStateOutputStream secondaryOut) throws IOException { this(new DuplicatingCheckpointOutputStream(primaryOut, secondaryOut)); } @@ -154,7 +153,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable { @Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException { - CheckpointStreamFactory.CheckpointStateOutputStream primaryOut = + CheckpointStateOutputStream primaryOut = primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope); return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut); @@ -168,7 +167,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable { @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException { - CheckpointStreamFactory.CheckpointStateOutputStream primaryOut = + CheckpointStateOutputStream primaryOut = primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope); try { @@ -179,7 +178,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable { String.valueOf(UUID.randomUUID())); Path outPath = new Path(outFile.toURI()); - CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut = + CheckpointStateOutputStream secondaryOut = new FileBasedStateOutputStream(outPath.getFileSystem(), outPath); return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java index 6055836..6dc6739 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java @@ -117,7 +117,7 @@ class DefaultOperatorStateBackendSnapshotStrategy } return (snapshotCloseableRegistry) -> { - CheckpointStreamFactory.CheckpointStateOutputStream localOut = + CheckpointStateOutputStream localOut = streamFactory.createCheckpointStateOutputStream( CheckpointedStateScope.EXCLUSIVE); snapshotCloseableRegistry.registerCloseable(localOut); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java index 5535aeb..fd9e7e32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java @@ -35,8 +35,7 @@ import java.io.IOException; * user. This class is used to write state for local recovery as a local (secondary) copy of the * (primary) state snapshot that is written to a (slower but highly-available) remote filesystem. */ -public class DuplicatingCheckpointOutputStream - extends CheckpointStreamFactory.CheckpointStateOutputStream { +public class DuplicatingCheckpointOutputStream extends CheckpointStateOutputStream { /** Default buffer size of 8KB. */ private static final int DEFAULT_BUFFER_SIZER = 8 * 1024; @@ -48,13 +47,13 @@ public class DuplicatingCheckpointOutputStream private int bufferIdx; /** Primary stream for writing the checkpoint data. Failures from this stream are forwarded. */ - private final CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream; + private final CheckpointStateOutputStream primaryOutputStream; /** * Primary stream for writing the checkpoint data. Failures from this stream are not forwarded * until {@link #closeAndGetSecondaryHandle()}. */ - private final CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream; + private final CheckpointStateOutputStream secondaryOutputStream; /** * Stores a potential exception that occurred while interacting with {@link @@ -63,15 +62,15 @@ public class DuplicatingCheckpointOutputStream private Exception secondaryStreamException; public DuplicatingCheckpointOutputStream( - CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream, - CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream) + CheckpointStateOutputStream primaryOutputStream, + CheckpointStateOutputStream secondaryOutputStream) throws IOException { this(primaryOutputStream, secondaryOutputStream, DEFAULT_BUFFER_SIZER); } public DuplicatingCheckpointOutputStream( - CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream, - CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream, + CheckpointStateOutputStream primaryOutputStream, + CheckpointStateOutputStream secondaryOutputStream, int bufferSize) throws IOException { @@ -280,12 +279,12 @@ public class DuplicatingCheckpointOutputStream } @VisibleForTesting - CheckpointStreamFactory.CheckpointStateOutputStream getPrimaryOutputStream() { + CheckpointStateOutputStream getPrimaryOutputStream() { return primaryOutputStream; } @VisibleForTesting - CheckpointStreamFactory.CheckpointStateOutputStream getSecondaryOutputStream() { + CheckpointStateOutputStream getSecondaryOutputStream() { return secondaryOutputStream; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java index c5a7277..96a3bd4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java @@ -136,7 +136,7 @@ public class FullSnapshotAsyncWriter<K> byte[] previousValue = null; DataOutputView kgOutView = null; OutputStream kgOutStream = null; - CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = + CheckpointStateOutputStream checkpointOutputStream = checkpointStreamWithResultProvider.getCheckpointOutputStream(); try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java index 7384fbe..a487e4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java @@ -39,8 +39,7 @@ public final class KeyedStateCheckpointOutputStream private final KeyGroupRangeOffsets keyGroupRangeOffsets; public KeyedStateCheckpointOutputStream( - CheckpointStreamFactory.CheckpointStateOutputStream delegate, - KeyGroupRange keyGroupRange) { + CheckpointStateOutputStream delegate, KeyGroupRange keyGroupRange) { super(delegate); Preconditions.checkNotNull(keyGroupRange); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java index d39b198..76530b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java @@ -34,11 +34,10 @@ import java.io.OutputStream; public abstract class NonClosingCheckpointOutputStream<T extends StreamStateHandle> extends OutputStream { - protected final CheckpointStreamFactory.CheckpointStateOutputStream delegate; + protected final CheckpointStateOutputStream delegate; private final ResourceGuard resourceGuard = new ResourceGuard(); - public NonClosingCheckpointOutputStream( - CheckpointStreamFactory.CheckpointStateOutputStream delegate) { + public NonClosingCheckpointOutputStream(CheckpointStateOutputStream delegate) { this.delegate = Preconditions.checkNotNull(delegate); } @@ -79,7 +78,7 @@ public abstract class NonClosingCheckpointOutputStream<T extends StreamStateHand } /** This method should not be public so as to not expose internals to user code. */ - CheckpointStreamFactory.CheckpointStateOutputStream getDelegate() { + CheckpointStateOutputStream getDelegate() { return delegate; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java index 34d700a..5f46187 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java @@ -33,8 +33,8 @@ public final class OperatorStateCheckpointOutputStream private LongArrayList partitionOffsets; private final long initialPosition; - public OperatorStateCheckpointOutputStream( - CheckpointStreamFactory.CheckpointStateOutputStream delegate) throws IOException { + public OperatorStateCheckpointOutputStream(CheckpointStateOutputStream delegate) + throws IOException { super(delegate); this.partitionOffsets = new LongArrayList(16); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java index 126150b..114bf97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java @@ -85,7 +85,7 @@ public class SavepointSnapshotStrategy<K> static CheckpointStreamWithResultProvider createSimpleStream( @Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException { - CheckpointStreamFactory.CheckpointStateOutputStream primaryOut = + CheckpointStateOutputStream primaryOut = primaryStreamFactory.createCheckpointStateOutputStream( CheckpointedStateScope.EXCLUSIVE); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java index a6b281b..4df7015 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java @@ -94,9 +94,8 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext return checkpointTimestamp; } - private CheckpointStreamFactory.CheckpointStateOutputStream openAndRegisterNewStream() - throws Exception { - CheckpointStreamFactory.CheckpointStateOutputStream cout = + private CheckpointStateOutputStream openAndRegisterNewStream() throws Exception { + CheckpointStateOutputStream cout = streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); closableRegistry.registerCloseable(cout); @@ -160,7 +159,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext Preconditions.checkNotNull(stream); - CheckpointStreamFactory.CheckpointStateOutputStream delegate = stream.getDelegate(); + CheckpointStateOutputStream delegate = stream.getDelegate(); if (closableRegistry.unregisterCloseable(delegate)) { delegate.close(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java index 7021ea50..12f0ba0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java @@ -22,7 +22,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java index a33c47b..1c3ebb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java @@ -22,10 +22,10 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream; import javax.annotation.Nullable; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index 8069bdd..5151130 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.OutputStreamAndPath; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.StreamStateHandle; @@ -161,11 +162,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { // ------------------------------------------------------------------------ /** - * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and - * returns a {@link StreamStateHandle} upon closing. + * A {@link CheckpointStateOutputStream} that writes into a file and returns a {@link + * StreamStateHandle} upon closing. */ - public static class FsCheckpointStateOutputStream - extends CheckpointStreamFactory.CheckpointStateOutputStream { + public static class FsCheckpointStateOutputStream extends CheckpointStateOutputStream { private final byte[] writeBuffer; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java index 78b96e5..8b2bc3e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; @@ -140,7 +141,7 @@ class HeapSnapshotStrategy<K> snapshotCloseableRegistry.registerCloseable(streamWithResultProvider); - final CheckpointStreamFactory.CheckpointStateOutputStream localStream = + final CheckpointStateOutputStream localStream = streamWithResultProvider.getCheckpointOutputStream(); final DataOutputViewStreamWrapper outView = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java index 5f36a42..98d09be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.memory; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.StreamStateHandle; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java index 5ff029d..10e33a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java @@ -22,10 +22,10 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java index 28b81d7..29cd71f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelSta import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.InputChannelStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java index 71fbdfd..8363e2e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java @@ -268,8 +268,7 @@ public class ChannelPersistenceITCase { } @Override - public CheckpointStreamFactory.CheckpointStateOutputStream - createTaskOwnedStateStream() { + public CheckpointStateOutputStream createTaskOwnedStateStream() { throw new UnsupportedOperationException(); } }; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java index 915c9bf..4e928c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java @@ -183,7 +183,7 @@ public class CheckpointStreamWithResultProviderTest extends TestLogger { private SnapshotResult<StreamStateHandle> writeCheckpointTestData( CheckpointStreamWithResultProvider resultProvider) throws IOException { - CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = + CheckpointStateOutputStream checkpointOutputStream = resultProvider.getCheckpointOutputStream(); checkpointOutputStream.write(0x42); return resultProvider.closeAndFinalizeCheckpointStreamResult(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java index 9fa30de..0b09655e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java @@ -285,8 +285,7 @@ public class DuplicatingCheckpointOutputStreamTest extends TestLogger { } /** Stream that throws {@link IOException} on all relevant methods under test. */ - private static class FailingCheckpointOutStream - extends CheckpointStreamFactory.CheckpointStateOutputStream { + private static class FailingCheckpointOutStream extends CheckpointStateOutputStream { private boolean closed = false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java index a6114ba..31f46f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java @@ -33,7 +33,7 @@ public class KeyedStateCheckpointOutputStreamTest { private static final int STREAM_CAPACITY = 128; private static KeyedStateCheckpointOutputStream createStream(KeyGroupRange keyGroupRange) { - CheckpointStreamFactory.CheckpointStateOutputStream checkStream = + CheckpointStateOutputStream checkStream = new TestMemoryCheckpointOutputStream(STREAM_CAPACITY); return new KeyedStateCheckpointOutputStream(checkStream, keyGroupRange); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java index d92c526..80af8e5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java @@ -35,7 +35,7 @@ public class OperatorStateOutputCheckpointStreamTest { private static final int STREAM_CAPACITY = 128; private static OperatorStateCheckpointOutputStream createStream() throws IOException { - CheckpointStreamFactory.CheckpointStateOutputStream checkStream = + CheckpointStateOutputStream checkStream = new TestMemoryCheckpointOutputStream(STREAM_CAPACITY); return new OperatorStateCheckpointOutputStream(checkStream); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java index 95b0248..9b8d298 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java @@ -49,8 +49,7 @@ public class TestCheckpointStorageWorkerView implements CheckpointStorageWorkerV } @Override - public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() - throws IOException { + public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException { return taskOwnedCheckpointStreamFactory.createCheckpointStateOutputStream( taskOwnedStateScope); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java index ff674b9..9e05949 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java @@ -93,7 +93,7 @@ public class TestingCheckpointStorageAccessCoordinatorView } @Override - public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() { + public CheckpointStateOutputStream createTaskOwnedStateStream() { return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(Integer.MAX_VALUE); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java index bf2c264..472c35c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java @@ -22,8 +22,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -90,7 +89,7 @@ public class FsCheckpointStateOutputStreamTest { @Test public void testEmptyState() throws Exception { - FsCheckpointStreamFactory.CheckpointStateOutputStream stream = + CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream( Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), @@ -124,7 +123,7 @@ public class FsCheckpointStateOutputStreamTest { @Test public void testGetPos() throws Exception { - FsCheckpointStreamFactory.CheckpointStateOutputStream stream = + CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream( Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), @@ -172,7 +171,7 @@ public class FsCheckpointStateOutputStreamTest { when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class))) .thenReturn(outputStream); - CheckpointStreamFactory.CheckpointStateOutputStream stream = + CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream( Path.fromLocalFile(tempDir.newFolder()), fs, 4, 0, relativePaths); @@ -198,7 +197,7 @@ public class FsCheckpointStateOutputStreamTest { .thenReturn(outputStream); doThrow(new IOException("Test IOException.")).when(outputStream).close(); - CheckpointStreamFactory.CheckpointStateOutputStream stream = + CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream( Path.fromLocalFile(tempDir.newFolder()), fs, 4, 0, relativePaths); @@ -219,7 +218,7 @@ public class FsCheckpointStateOutputStreamTest { private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception { - FsCheckpointStreamFactory.CheckpointStateOutputStream stream = + CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream( Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), @@ -335,13 +334,13 @@ public class FsCheckpointStateOutputStreamTest { // use with try-with-resources StreamStateHandle handle4; - try (CheckpointStreamFactory.CheckpointStateOutputStream stream4 = factory.get()) { + try (CheckpointStateOutputStream stream4 = factory.get()) { stream4.write(state4); handle4 = stream4.closeAndGetHandle(); } // close before accessing handle - CheckpointStreamFactory.CheckpointStateOutputStream stream5 = factory.get(); + CheckpointStateOutputStream stream5 = factory.get(); stream5.write(state4); stream5.close(); try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java index 77cb9b6..7b24f3d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java @@ -22,9 +22,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java index a31a09c..8a4cf38 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java @@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.junit.Rule; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java index 07d43cd..63dc7a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.state.memory; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java index aef780d..bebe5da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java @@ -21,9 +21,9 @@ package org.apache.flink.runtime.state.memory; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageAccessTestBase; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java index 9c57d0f..0331d5b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java @@ -20,11 +20,11 @@ package org.apache.flink.runtime.state.testutils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.memory.MemoryStateBackend; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java index a4e1efb..80f36b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.testutils; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java index a71d9bc..fb4a8a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.ttl.mock; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; @@ -95,8 +96,7 @@ public class MockCheckpointStorage implements CheckpointStorage { } @Override - public CheckpointStreamFactory.CheckpointStateOutputStream - createTaskOwnedStateStream() { + public CheckpointStateOutputStream createTaskOwnedStateStream() { return null; } }; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java index 66e8b25..021e340 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.util; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java index 00aac67..e6217c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.util; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.IOUtils; @@ -35,8 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * until the stream is closed. This is typically used to test that a blocking read can be * interrupted / closed. */ -public class BlockingCheckpointOutputStream - extends CheckpointStreamFactory.CheckpointStateOutputStream { +public class BlockingCheckpointOutputStream extends CheckpointStateOutputStream { /** Optional delegate stream to which all ops are forwarded. */ private final FSDataOutputStream delegate; @@ -192,10 +191,9 @@ public class BlockingCheckpointOutputStream throw new IOException("Stream was already closed!"); } - if (delegate instanceof CheckpointStreamFactory.CheckpointStateOutputStream) { + if (delegate instanceof CheckpointStateOutputStream) { StreamStateHandle streamStateHandle = - ((CheckpointStreamFactory.CheckpointStateOutputStream) delegate) - .closeAndGetHandle(); + ((CheckpointStateOutputStream) delegate).closeAndGetHandle(); unblockAll(); return streamStateHandle; } else { diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index cf08092..9fc4f56 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -29,10 +29,12 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.runtime.state.Keyed; @@ -45,6 +47,7 @@ import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInf import org.apache.flink.runtime.state.SavepointResources; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestableKeyedStateBackend; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl; @@ -217,7 +220,29 @@ public class ChangelogKeyedStateBackend<K> this.stateChangelogWriter = stateChangelogWriter; this.changelogStates = new HashMap<>(); this.changelogSnapshotState = completeRestore(initialState); - this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream(); + this.streamFactory = + new CheckpointStreamFactory() { + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream( + CheckpointedStateScope scope) throws IOException { + return checkpointStorageWorkerView.createTaskOwnedStateStream(); + } + + @Override + public boolean canFastDuplicate( + StreamStateHandle stateHandle, CheckpointedStateScope scope) + throws IOException { + return false; + } + + @Override + public List<StreamStateHandle> duplicate( + List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) + throws IOException { + return null; + } + }; this.closer.register(keyedStateBackend); } diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java index 4672c7c..bed4773 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java @@ -35,6 +35,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CheckpointStateToolset; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; @@ -398,7 +400,13 @@ public class ChangelogStateBackendTestUtils { } @Override - public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() { + public CheckpointStateOutputStream createTaskOwnedStateStream() { + throw new UnsupportedOperationException( + "Checkpoints are not supported in a single key state backend"); + } + + @Override + public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() { throw new UnsupportedOperationException( "Checkpoints are not supported in a single key state backend"); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java index 09b4ea0..68d1ed9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java @@ -19,6 +19,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.StateHandleID; @@ -115,7 +116,7 @@ public class RocksDBStateUploader extends RocksDBStateDataTransfer { throws IOException { InputStream inputStream = null; - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + CheckpointStateOutputStream outputStream = null; try { final byte[] buffer = new byte[READ_BUFFER_SIZE]; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 6ea1da5..f61394b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -40,8 +40,8 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java index 8ef843a..434976d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java @@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.StateHandleID; @@ -42,6 +43,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -59,10 +61,30 @@ public class RocksDBStateUploaderTest extends TestLogger { SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states."); - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = + CheckpointStateOutputStream outputStream = createFailingCheckpointStateOutputStream(expectedException); CheckpointStreamFactory checkpointStreamFactory = - (CheckpointedStateScope scope) -> outputStream; + new CheckpointStreamFactory() { + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream( + CheckpointedStateScope scope) throws IOException { + return outputStream; + } + + @Override + public boolean canFastDuplicate( + StreamStateHandle stateHandle, CheckpointedStateScope scope) + throws IOException { + return false; + } + + @Override + public List<StreamStateHandle> duplicate( + List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) + throws IOException { + return null; + } + }; File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID())); generateRandomFileContent(file.getPath(), 20); @@ -119,9 +141,9 @@ public class RocksDBStateUploaderTest extends TestLogger { } } - private CheckpointStreamFactory.CheckpointStateOutputStream - createFailingCheckpointStateOutputStream(IOException failureException) { - return new CheckpointStreamFactory.CheckpointStateOutputStream() { + private CheckpointStateOutputStream createFailingCheckpointStateOutputStream( + IOException failureException) { + return new CheckpointStateOutputStream() { @Nullable @Override public StreamStateHandle closeAndGetHandle() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java index 0de86e1..3422850 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators.sorted.state; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; @@ -68,7 +69,7 @@ class NonCheckpointingStorageAccess implements CheckpointStorageAccess { } @Override - public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() { + public CheckpointStateOutputStream createTaskOwnedStateStream() { throw new UnsupportedOperationException( "Checkpoints are not supported in a single key state backend"); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index 4627c7d..a93c35d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -25,8 +25,8 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; -import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.JavaSerializer; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -69,7 +69,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I private final CheckpointCommitter committer; protected final TypeSerializer<IN> serializer; - private transient CheckpointStreamFactory.CheckpointStateOutputStream out; + private transient CheckpointStateOutputStream out; private transient CheckpointStorageWorkerView checkpointStorage; private transient ListState<PendingCheckpoint> checkpointedState; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index c8cc362..bcb4bdd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -713,8 +714,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { } @Override - public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() - throws IOException { + public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException { return delegate.createTaskOwnedStateStream(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java index 9a7f04b..7dd7f32 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.KeyGroupRange; @@ -83,10 +84,8 @@ public class StateSnapshotContextSynchronousImplTest extends TestLogger { long checkpointId = 42L; long checkpointTimestamp = 1L; - CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = - mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); - CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = - mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + CheckpointStateOutputStream outputStream1 = mock(CheckpointStateOutputStream.class); + CheckpointStateOutputStream outputStream2 = mock(CheckpointStateOutputStream.class); CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class); when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) @@ -129,10 +128,8 @@ public class StateSnapshotContextSynchronousImplTest extends TestLogger { long checkpointId = 42L; long checkpointTimestamp = 1L; - CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = - mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); - CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = - mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + CheckpointStateOutputStream outputStream1 = mock(CheckpointStateOutputStream.class); + CheckpointStateOutputStream outputStream2 = mock(CheckpointStateOutputStream.class); CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class); when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index 598bf52..517d632 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -55,8 +55,8 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder; import org.apache.flink.runtime.state.OperatorStateBackend; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java index 15252c7..ef7fc8a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.lib.NumberSequenceSource; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; @@ -227,8 +228,7 @@ public class UnalignedCheckpointFailureHandlingITCase { } @Override - public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() - throws IOException { + public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException { return delegate.createTaskOwnedStateStream(); }