This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b6b4baf [FLINK-11874][checkpoint] Split CheckpointStorage interface to distinguish JM and TM side b6b4baf is described below commit b6b4baf588fae0494e32492ac16df8977d8cfc1b Author: Yun Tang <myas...@live.com> AuthorDate: Thu Mar 14 18:48:39 2019 +0800 [FLINK-11874][checkpoint] Split CheckpointStorage interface to distinguish JM and TM side This closes #7970. --- .../runtime/checkpoint/CheckpointCoordinator.java | 12 ++- .../flink/runtime/state/CheckpointStorage.java | 117 +-------------------- ....java => CheckpointStorageCoordinatorView.java} | 57 ++-------- .../runtime/state/CheckpointStorageWorkerView.java | 71 +++++++++++++ .../state/filesystem/FsCheckpointStorage.java | 4 +- .../memory/MemoryBackendCheckpointStorage.java | 6 +- .../flink/runtime/dispatcher/DispatcherTest.java | 4 +- .../flink/runtime/state/StateBackendTestBase.java | 10 +- .../state/testutils/BackendForTestStream.java | 12 +-- .../runtime/state/ttl/StateBackendTestContext.java | 14 +-- .../runtime/operators/GenericWriteAheadSink.java | 4 +- .../flink/streaming/runtime/tasks/StreamTask.java | 6 +- .../flink/streaming/util/MockStreamTask.java | 8 +- .../streaming/util/MockStreamTaskBuilder.java | 3 +- 14 files changed, 129 insertions(+), 199 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 9935455..ac4c1e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -35,13 +35,14 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryFactory; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -50,6 +51,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayDeque; import java.util.HashMap; import java.util.Iterator; @@ -119,7 +121,7 @@ public class CheckpointCoordinator { /** The root checkpoint state backend, which is responsible for initializing the * checkpoint, storing the metadata, and cleaning up the checkpoint. */ - private final CheckpointStorage checkpointStorage; + private final CheckpointStorageCoordinatorView checkpointStorage; /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones). */ private final ArrayDeque<Long> recentPendingCheckpoints; @@ -247,7 +249,11 @@ public class CheckpointCoordinator { try { this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job); + } catch (IOException e) { + throw new FlinkRuntimeException("Failed to create checkpoint storage at checkpoint coordinator side.", e); + } + try { // Make sure the checkpoint ID enumerator is running. Possibly // issues a blocking call to ZooKeeper. checkpointIDCounter.start(); @@ -1140,7 +1146,7 @@ public class CheckpointCoordinator { } } - public CheckpointStorage getCheckpointStorage() { + public CheckpointStorageCoordinatorView getCheckpointStorage() { return checkpointStorage; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java index 0f8aa69..af02da5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java @@ -18,119 +18,10 @@ package org.apache.flink.runtime.state; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; - -import javax.annotation.Nullable; - -import java.io.IOException; - /** - * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. - * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, - * created by this class. + * Implementations of this interface should implement methods acting as an administration role for checkpoint storage, + * which defined in {@link CheckpointStorageCoordinatorView}. And also implement methods acting as a worker role, + * which defined in {@link CheckpointStorageWorkerView}. */ -public interface CheckpointStorage { - - /** - * Checks whether this backend supports highly available storage of data. - * - * <p>Some state backends may not support highly-available durable storage, with default settings, - * which makes them suitable for zero-config prototyping, but not for actual production setups. - */ - boolean supportsHighlyAvailableStorage(); - - /** - * Checks whether the storage has a default savepoint location configured. - */ - boolean hasDefaultSavepointLocation(); - - /** - * Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location - * supports reading the checkpoint metadata, or disposing the checkpoint storage location. - * - * <p>If the state backend cannot understand the format of the pointer (for example because it - * was created by a different state backend) this method should throw an {@code IOException}. - * - * @param externalPointer The external checkpoint pointer to resolve. - * @return The checkpoint location handle. - * - * @throws IOException Thrown, if the state backend does not understand the pointer, or if - * the pointer could not be resolved due to an I/O error. - */ - CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException; - - /** - * Initializes a storage location for new checkpoint with the given ID. - * - * <p>The returned storage location can be used to write the checkpoint data and metadata - * to and to obtain the pointers for the location(s) where the actual checkpoint data should be - * stored. - * - * @param checkpointId The ID (logical timestamp) of the checkpoint that should be persisted. - * @return A storage location for the data and metadata of the given checkpoint. - * - * @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. - */ - CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException; - - /** - * Initializes a storage location for new savepoint with the given ID. - * - * <p>If an external location pointer is passed, the savepoint storage location - * will be initialized at the location of that pointer. If the external location pointer is null, - * the default savepoint location will be used. If no default savepoint location is configured, - * this will throw an exception. Whether a default savepoint location is configured can be - * checked via {@link #hasDefaultSavepointLocation()}. - * - * @param checkpointId The ID (logical timestamp) of the savepoint's checkpoint. - * @param externalLocationPointer Optionally, a pointer to the location where the savepoint should - * be stored. May be null. - * - * @return A storage location for the data and metadata of the savepoint. - * - * @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. - */ - CheckpointStorageLocation initializeLocationForSavepoint( - long checkpointId, - @Nullable String externalLocationPointer) throws IOException; - - /** - * Resolves a storage location reference into a CheckpointStreamFactory. - * - * <p>The reference may be the {@link CheckpointStorageLocationReference#isDefaultReference() default reference}, - * in which case the method should return the default location, taking existing configuration - * and checkpoint ID into account. - * - * @param checkpointId The ID of the checkpoint that the location is initialized for. - * @param reference The checkpoint location reference. - * - * @return A checkpoint storage location reflecting the reference and checkpoint ID. - * - * @throws IOException Thrown, if the storage location cannot be initialized from the reference. - */ - CheckpointStreamFactory resolveCheckpointStorageLocation( - long checkpointId, - CheckpointStorageLocationReference reference) throws IOException; - - /** - * Opens a stream to persist checkpoint state data that is owned strictly by tasks and - * not attached to the life cycle of a specific checkpoint. - * - * <p>This method should be used when the persisted data cannot be immediately dropped once - * the checkpoint that created it is dropped. Examples are write-ahead-logs. - * For those, the state can only be dropped once the data has been moved to the target system, - * which may sometimes take longer than one checkpoint (if the target system is temporarily unable - * to keep up). - * - * <p>The fact that the job manager does not own the life cycle of this type of state means also - * that it is strictly the responsibility of the tasks to handle the cleanup of this data. - * - * <p>Developer note: In the future, we may be able to make this a special case of "shared state", - * where the task re-emits the shared state reference as long as it needs to hold onto the - * persisted state data. - * - * @return A checkpoint state stream to the location for state owned by tasks. - * @throws IOException Thrown, if the stream cannot be opened. - */ - CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException; +public interface CheckpointStorage extends CheckpointStorageCoordinatorView, CheckpointStorageWorkerView { } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java similarity index 60% copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java index 0f8aa69..0d31328 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,18 +18,17 @@ package org.apache.flink.runtime.state; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; - import javax.annotation.Nullable; import java.io.IOException; /** - * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. - * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, - * created by this class. + * This interface creates a {@link CheckpointStorageLocation} to which + * an individual checkpoint or savepoint is stored. + * + * <p>Methods of this interface act as an administration role in checkpoint coordinator. */ -public interface CheckpointStorage { +public interface CheckpointStorageCoordinatorView { /** * Checks whether this backend supports highly available storage of data. @@ -91,46 +90,6 @@ public interface CheckpointStorage { * @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. */ CheckpointStorageLocation initializeLocationForSavepoint( - long checkpointId, - @Nullable String externalLocationPointer) throws IOException; - - /** - * Resolves a storage location reference into a CheckpointStreamFactory. - * - * <p>The reference may be the {@link CheckpointStorageLocationReference#isDefaultReference() default reference}, - * in which case the method should return the default location, taking existing configuration - * and checkpoint ID into account. - * - * @param checkpointId The ID of the checkpoint that the location is initialized for. - * @param reference The checkpoint location reference. - * - * @return A checkpoint storage location reflecting the reference and checkpoint ID. - * - * @throws IOException Thrown, if the storage location cannot be initialized from the reference. - */ - CheckpointStreamFactory resolveCheckpointStorageLocation( - long checkpointId, - CheckpointStorageLocationReference reference) throws IOException; - - /** - * Opens a stream to persist checkpoint state data that is owned strictly by tasks and - * not attached to the life cycle of a specific checkpoint. - * - * <p>This method should be used when the persisted data cannot be immediately dropped once - * the checkpoint that created it is dropped. Examples are write-ahead-logs. - * For those, the state can only be dropped once the data has been moved to the target system, - * which may sometimes take longer than one checkpoint (if the target system is temporarily unable - * to keep up). - * - * <p>The fact that the job manager does not own the life cycle of this type of state means also - * that it is strictly the responsibility of the tasks to handle the cleanup of this data. - * - * <p>Developer note: In the future, we may be able to make this a special case of "shared state", - * where the task re-emits the shared state reference as long as it needs to hold onto the - * persisted state data. - * - * @return A checkpoint state stream to the location for state owned by tasks. - * @throws IOException Thrown, if the stream cannot be opened. - */ - CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException; + long checkpointId, + @Nullable String externalLocationPointer) throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java new file mode 100644 index 0000000..dead842 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.IOException; + +/** + * This interface implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation} + * which created by {@link CheckpointStorageCoordinatorView}. + * + * <p>Methods of this interface act as a worker role in task manager. + */ +public interface CheckpointStorageWorkerView { + + /** + * Resolves a storage location reference into a CheckpointStreamFactory. + * + * <p>The reference may be the {@link CheckpointStorageLocationReference#isDefaultReference() default reference}, + * in which case the method should return the default location, taking existing configuration + * and checkpoint ID into account. + * + * @param checkpointId The ID of the checkpoint that the location is initialized for. + * @param reference The checkpoint location reference. + * + * @return A checkpoint storage location reflecting the reference and checkpoint ID. + * + * @throws IOException Thrown, if the storage location cannot be initialized from the reference. + */ + CheckpointStreamFactory resolveCheckpointStorageLocation( + long checkpointId, + CheckpointStorageLocationReference reference) throws IOException; + + /** + * Opens a stream to persist checkpoint state data that is owned strictly by tasks and + * not attached to the life cycle of a specific checkpoint. + * + * <p>This method should be used when the persisted data cannot be immediately dropped once + * the checkpoint that created it is dropped. Examples are write-ahead-logs. + * For those, the state can only be dropped once the data has been moved to the target system, + * which may sometimes take longer than one checkpoint (if the target system is temporarily unable + * to keep up). + * + * <p>The fact that the job manager does not own the life cycle of this type of state means also + * that it is strictly the responsibility of the tasks to handle the cleanup of this data. + * + * <p>Developer note: In the future, we may be able to make this a special case of "shared state", + * where the task re-emits the shared state reference as long as it needs to hold onto the + * persisted state data. + * + * @return A checkpoint state stream to the location for state owned by tasks. + * @throws IOException Thrown, if the stream cannot be opened. + */ + CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java index 1549e01..7c88d0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java @@ -151,7 +151,7 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage { } @Override - public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException { + public CheckpointStateOutputStream createTaskOwnedStateStream() { return new FsCheckpointStateOutputStream( taskOwnedStateDirectory, fileSystem, @@ -160,7 +160,7 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage { } @Override - protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException { + protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) { final CheckpointStorageLocationReference reference = encodePathAsReference(location); return new FsCheckpointStorageLocation(fs, location, location, location, reference, fileSizeThreshold); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java index 6f0ca4c..41a0359 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java @@ -133,7 +133,7 @@ public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage @Override public CheckpointStreamFactory resolveCheckpointStorageLocation( long checkpointId, - CheckpointStorageLocationReference reference) throws IOException { + CheckpointStorageLocationReference reference) { // no matter where the checkpoint goes, we always return the storage location that stores // state inline with the state handles. @@ -141,12 +141,12 @@ public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage } @Override - public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException { + public CheckpointStateOutputStream createTaskOwnedStateStream() { return new MemoryCheckpointOutputStream(maxStateSize); } @Override - protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException { + protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) { return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index f0d9ce7..889160c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -56,7 +56,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; -import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.StateBackend; @@ -466,7 +466,7 @@ public class DispatcherTest extends TestLogger { @Nonnull private URI createTestingSavepoint() throws IOException, URISyntaxException { final StateBackend stateBackend = Checkpoints.loadStateBackend(configuration, Thread.currentThread().getContextClassLoader(), log); - final CheckpointStorage checkpointStorage = stateBackend.createCheckpointStorage(jobGraph.getJobID()); + final CheckpointStorageCoordinatorView checkpointStorage = stateBackend.createCheckpointStorage(jobGraph.getJobID()); final File savepointFile = temporaryFolder.newFolder(); final long checkpointId = 1L; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 08cc6ac..462df70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -139,20 +139,20 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten public final ExpectedException expectedException = ExpectedException.none(); // lazily initialized stream storage - private CheckpointStorageLocation checkpointStorageLocation; + private CheckpointStreamFactory checkpointStreamFactory; protected abstract B getStateBackend() throws Exception; protected abstract boolean isSerializerPresenceRequiredOnRestore(); protected CheckpointStreamFactory createStreamFactory() throws Exception { - if (checkpointStorageLocation == null) { - checkpointStorageLocation = getStateBackend() + if (checkpointStreamFactory == null) { + checkpointStreamFactory = getStateBackend() .createCheckpointStorage(new JobID()) - .initializeLocationForCheckpoint(1L); + .resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault()); } - return checkpointStorageLocation; + return checkpointStreamFactory; } protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java index 3f42952..df28514 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java @@ -61,7 +61,7 @@ public class BackendForTestStream extends MemoryStateBackend { } @Override - public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException { + public CheckpointStorage createCheckpointStorage(JobID jobId) { return new TestCheckpointStorage(); } @@ -85,27 +85,27 @@ public class BackendForTestStream extends MemoryStateBackend { } @Override - public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException { + public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) { throw new UnsupportedOperationException(); } @Override - public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException { + public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) { throw new UnsupportedOperationException(); } @Override - public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) throws IOException { + public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) { throw new UnsupportedOperationException(); } @Override - public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) throws IOException { + public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) { return streamFactory; } @Override - public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException { + public CheckpointStateOutputStream createTaskOwnedStateStream() { throw new UnsupportedOperationException(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java index 887dedf..3a54242 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; @@ -51,7 +51,8 @@ public abstract class StateBackendTestContext { public static final int NUMBER_OF_KEY_GROUPS = 10; private final StateBackend stateBackend; - private final CheckpointStorageLocation checkpointStorageLocation; + private final CheckpointOptions checkpointOptions; + private final CheckpointStreamFactory checkpointStreamFactory; private final TtlTimeProvider timeProvider; private final SharedStateRegistry sharedStateRegistry; private final List<KeyedStateHandle> snapshots; @@ -61,18 +62,19 @@ public abstract class StateBackendTestContext { protected StateBackendTestContext(TtlTimeProvider timeProvider) { this.timeProvider = Preconditions.checkNotNull(timeProvider); this.stateBackend = Preconditions.checkNotNull(createStateBackend()); - this.checkpointStorageLocation = createCheckpointStorageLocation(); + this.checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); + this.checkpointStreamFactory = createCheckpointStreamFactory(); this.sharedStateRegistry = new SharedStateRegistry(); this.snapshots = new ArrayList<>(); } protected abstract StateBackend createStateBackend(); - private CheckpointStorageLocation createCheckpointStorageLocation() { + private CheckpointStreamFactory createCheckpointStreamFactory() { try { return stateBackend .createCheckpointStorage(new JobID()) - .initializeLocationForCheckpoint(2L); + .resolveCheckpointStorageLocation(2L, checkpointOptions.getTargetLocation()); } catch (IOException e) { throw new RuntimeException("unexpected"); } @@ -139,7 +141,7 @@ public abstract class StateBackendTestContext { RunnableFuture<SnapshotResult<KeyedStateHandle>> triggerSnapshot() throws Exception { RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture = keyedStateBackend.snapshot(682375462392L, 10L, - checkpointStorageLocation, CheckpointOptions.forCheckpointWithDefaultLocation()); + checkpointStreamFactory, checkpointOptions); if (!snapshotRunnableFuture.isDone()) { snapshotRunnableFuture.run(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index 27db126..4ad0fc6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.io.disk.InputViewIterator; -import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -66,7 +66,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I protected final TypeSerializer<IN> serializer; private transient CheckpointStreamFactory.CheckpointStateOutputStream out; - private transient CheckpointStorage checkpointStorage; + private transient CheckpointStorageWorkerView checkpointStorage; private transient ListState<PendingCheckpoint> checkpointedState; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 6d5ebea..40e31dd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendLoader; @@ -144,7 +144,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> protected StateBackend stateBackend; /** The external storage where checkpoint data is persisted. */ - private CheckpointStorage checkpointStorage; + private CheckpointStorageWorkerView checkpointStorage; /** * The internal {@link ProcessingTimeService} used to define the current @@ -529,7 +529,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> return lock; } - public CheckpointStorage getCheckpointStorage() { + public CheckpointStorageWorkerView getCheckpointStorage() { return checkpointStorage; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java index 45c80da..230c68a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; @@ -44,7 +44,7 @@ public class MockStreamTask extends StreamTask { private StreamTaskStateInitializer streamTaskStateInitializer; private final CloseableRegistry closableRegistry; private final StreamStatusMaintainer streamStatusMaintainer; - private final CheckpointStorage checkpointStorage; + private final CheckpointStorageWorkerView checkpointStorage; private final ProcessingTimeService processingTimeService; private final BiConsumer<String, Throwable> handleAsyncException; private final Map<String, Accumulator<?, ?>> accumulatorMap; @@ -58,7 +58,7 @@ public class MockStreamTask extends StreamTask { StreamTaskStateInitializer streamTaskStateInitializer, CloseableRegistry closableRegistry, StreamStatusMaintainer streamStatusMaintainer, - CheckpointStorage checkpointStorage, + CheckpointStorageWorkerView checkpointStorage, ProcessingTimeService processingTimeService, BiConsumer<String, Throwable> handleAsyncException, Map<String, Accumulator<?, ?>> accumulatorMap @@ -134,7 +134,7 @@ public class MockStreamTask extends StreamTask { } @Override - public CheckpointStorage getCheckpointStorage() { + public CheckpointStorageWorkerView getCheckpointStorage() { return checkpointStorage; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java index b5cdaf2..4353d3a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -50,7 +51,7 @@ public class MockStreamTaskBuilder { private ExecutionConfig executionConfig = new ExecutionConfig(); private CloseableRegistry closableRegistry = new CloseableRegistry(); private StreamStatusMaintainer streamStatusMaintainer = new MockStreamStatusMaintainer(); - private CheckpointStorage checkpointStorage; + private CheckpointStorageWorkerView checkpointStorage; private ProcessingTimeService processingTimeService = new TestProcessingTimeService(); private StreamTaskStateInitializer streamTaskStateInitializer; private BiConsumer<String, Throwable> handleAsyncException = (message, throwable) -> { };