This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b6b4baf  [FLINK-11874][checkpoint] Split CheckpointStorage interface 
to distinguish JM and TM side
b6b4baf is described below

commit b6b4baf588fae0494e32492ac16df8977d8cfc1b
Author: Yun Tang <myas...@live.com>
AuthorDate: Thu Mar 14 18:48:39 2019 +0800

    [FLINK-11874][checkpoint] Split CheckpointStorage interface to distinguish 
JM and TM side
    
    This closes #7970.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  12 ++-
 .../flink/runtime/state/CheckpointStorage.java     | 117 +--------------------
 ....java => CheckpointStorageCoordinatorView.java} |  57 ++--------
 .../runtime/state/CheckpointStorageWorkerView.java |  71 +++++++++++++
 .../state/filesystem/FsCheckpointStorage.java      |   4 +-
 .../memory/MemoryBackendCheckpointStorage.java     |   6 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   |   4 +-
 .../flink/runtime/state/StateBackendTestBase.java  |  10 +-
 .../state/testutils/BackendForTestStream.java      |  12 +--
 .../runtime/state/ttl/StateBackendTestContext.java |  14 +--
 .../runtime/operators/GenericWriteAheadSink.java   |   4 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |   6 +-
 .../flink/streaming/util/MockStreamTask.java       |   8 +-
 .../streaming/util/MockStreamTaskBuilder.java      |   3 +-
 14 files changed, 129 insertions(+), 199 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9935455..ac4c1e2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -35,13 +35,14 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -50,6 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -119,7 +121,7 @@ public class CheckpointCoordinator {
 
        /** The root checkpoint state backend, which is responsible for 
initializing the
         * checkpoint, storing the metadata, and cleaning up the checkpoint. */
-       private final CheckpointStorage checkpointStorage;
+       private final CheckpointStorageCoordinatorView checkpointStorage;
 
        /** A list of recent checkpoint IDs, to identify late messages (vs 
invalid ones). */
        private final ArrayDeque<Long> recentPendingCheckpoints;
@@ -247,7 +249,11 @@ public class CheckpointCoordinator {
 
                try {
                        this.checkpointStorage = 
checkpointStateBackend.createCheckpointStorage(job);
+               } catch (IOException e) {
+                       throw new FlinkRuntimeException("Failed to create 
checkpoint storage at checkpoint coordinator side.", e);
+               }
 
+               try {
                        // Make sure the checkpoint ID enumerator is running. 
Possibly
                        // issues a blocking call to ZooKeeper.
                        checkpointIDCounter.start();
@@ -1140,7 +1146,7 @@ public class CheckpointCoordinator {
                }
        }
 
-       public CheckpointStorage getCheckpointStorage() {
+       public CheckpointStorageCoordinatorView getCheckpointStorage() {
                return checkpointStorage;
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
index 0f8aa69..af02da5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
@@ -18,119 +18,10 @@
 
 package org.apache.flink.runtime.state;
 
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-
 /**
- * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
- * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
- * created by this class.
+ * Implementations of this interface should implement methods acting as an 
administration role for checkpoint storage,
+ * which defined in {@link CheckpointStorageCoordinatorView}. And also 
implement methods acting as a worker role,
+ * which defined in {@link CheckpointStorageWorkerView}.
  */
-public interface CheckpointStorage {
-
-       /**
-        * Checks whether this backend supports highly available storage of 
data.
-        *
-        * <p>Some state backends may not support highly-available durable 
storage, with default settings,
-        * which makes them suitable for zero-config prototyping, but not for 
actual production setups.
-        */
-       boolean supportsHighlyAvailableStorage();
-
-       /**
-        * Checks whether the storage has a default savepoint location 
configured.
-        */
-       boolean hasDefaultSavepointLocation();
-
-       /**
-        * Resolves the given pointer to a checkpoint/savepoint into a 
checkpoint location. The location
-        * supports reading the checkpoint metadata, or disposing the 
checkpoint storage location.
-        *
-        * <p>If the state backend cannot understand the format of the pointer 
(for example because it
-        * was created by a different state backend) this method should throw 
an {@code IOException}.
-        *
-        * @param externalPointer The external checkpoint pointer to resolve.
-        * @return The checkpoint location handle.
-        *
-        * @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
-        *                     the pointer could not be resolved due to an I/O 
error.
-        */
-       CompletedCheckpointStorageLocation resolveCheckpoint(String 
externalPointer) throws IOException;
-
-       /**
-        * Initializes a storage location for new checkpoint with the given ID.
-        *
-        * <p>The returned storage location can be used to write the checkpoint 
data and metadata
-        * to and to obtain the pointers for the location(s) where the actual 
checkpoint data should be
-        * stored.
-        *
-        * @param checkpointId The ID (logical timestamp) of the checkpoint 
that should be persisted.
-        * @return A storage location for the data and metadata of the given 
checkpoint.
-        *
-        * @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
-        */
-       CheckpointStorageLocation initializeLocationForCheckpoint(long 
checkpointId) throws IOException;
-
-       /**
-        * Initializes a storage location for new savepoint with the given ID.
-        *
-        * <p>If an external location pointer is passed, the savepoint storage 
location
-        * will be initialized at the location of that pointer. If the external 
location pointer is null,
-        * the default savepoint location will be used. If no default savepoint 
location is configured,
-        * this will throw an exception. Whether a default savepoint location 
is configured can be
-        * checked via {@link #hasDefaultSavepointLocation()}.
-        *
-        * @param checkpointId The ID (logical timestamp) of the savepoint's 
checkpoint.
-        * @param externalLocationPointer Optionally, a pointer to the location 
where the savepoint should
-        *                                be stored. May be null.
-        *
-        * @return A storage location for the data and metadata of the 
savepoint.
-        *
-        * @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
-        */
-       CheckpointStorageLocation initializeLocationForSavepoint(
-                       long checkpointId,
-                       @Nullable String externalLocationPointer) throws 
IOException;
-
-       /**
-        * Resolves a storage location reference into a CheckpointStreamFactory.
-        *
-        * <p>The reference may be the {@link 
CheckpointStorageLocationReference#isDefaultReference() default reference},
-        * in which case the method should return the default location, taking 
existing configuration
-        * and checkpoint ID into account.
-        *
-        * @param checkpointId The ID of the checkpoint that the location is 
initialized for.
-        * @param reference The checkpoint location reference.
-        *
-        * @return A checkpoint storage location reflecting the reference and 
checkpoint ID.
-        *
-        * @throws IOException Thrown, if the storage location cannot be 
initialized from the reference.
-        */
-       CheckpointStreamFactory resolveCheckpointStorageLocation(
-                       long checkpointId,
-                       CheckpointStorageLocationReference reference) throws 
IOException;
-
-       /**
-        * Opens a stream to persist checkpoint state data that is owned 
strictly by tasks and
-        * not attached to the life cycle of a specific checkpoint.
-        *
-        * <p>This method should be used when the persisted data cannot be 
immediately dropped once
-        * the checkpoint that created it is dropped. Examples are 
write-ahead-logs.
-        * For those, the state can only be dropped once the data has been 
moved to the target system,
-        * which may sometimes take longer than one checkpoint (if the target 
system is temporarily unable
-        * to keep up).
-        *
-        * <p>The fact that the job manager does not own the life cycle of this 
type of state means also
-        * that it is strictly the responsibility of the tasks to handle the 
cleanup of this data.
-        *
-        * <p>Developer note: In the future, we may be able to make this a 
special case of "shared state",
-        * where the task re-emits the shared state reference as long as it 
needs to hold onto the
-        * persisted state data.
-        *
-        * @return A checkpoint state stream to the location for state owned by 
tasks.
-        * @throws IOException Thrown, if the stream cannot be opened.
-        */
-       CheckpointStateOutputStream createTaskOwnedStateStream() throws 
IOException;
+public interface CheckpointStorage extends CheckpointStorageCoordinatorView, 
CheckpointStorageWorkerView {
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
similarity index 60%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
index 0f8aa69..0d31328 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +18,17 @@
 
 package org.apache.flink.runtime.state;
 
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 
 /**
- * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
- * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
- * created by this class.
+ * This interface creates a {@link CheckpointStorageLocation} to which
+ * an individual checkpoint or savepoint is stored.
+ *
+ * <p>Methods of this interface act as an administration role in checkpoint 
coordinator.
  */
-public interface CheckpointStorage {
+public interface CheckpointStorageCoordinatorView {
 
        /**
         * Checks whether this backend supports highly available storage of 
data.
@@ -91,46 +90,6 @@ public interface CheckpointStorage {
         * @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
         */
        CheckpointStorageLocation initializeLocationForSavepoint(
-                       long checkpointId,
-                       @Nullable String externalLocationPointer) throws 
IOException;
-
-       /**
-        * Resolves a storage location reference into a CheckpointStreamFactory.
-        *
-        * <p>The reference may be the {@link 
CheckpointStorageLocationReference#isDefaultReference() default reference},
-        * in which case the method should return the default location, taking 
existing configuration
-        * and checkpoint ID into account.
-        *
-        * @param checkpointId The ID of the checkpoint that the location is 
initialized for.
-        * @param reference The checkpoint location reference.
-        *
-        * @return A checkpoint storage location reflecting the reference and 
checkpoint ID.
-        *
-        * @throws IOException Thrown, if the storage location cannot be 
initialized from the reference.
-        */
-       CheckpointStreamFactory resolveCheckpointStorageLocation(
-                       long checkpointId,
-                       CheckpointStorageLocationReference reference) throws 
IOException;
-
-       /**
-        * Opens a stream to persist checkpoint state data that is owned 
strictly by tasks and
-        * not attached to the life cycle of a specific checkpoint.
-        *
-        * <p>This method should be used when the persisted data cannot be 
immediately dropped once
-        * the checkpoint that created it is dropped. Examples are 
write-ahead-logs.
-        * For those, the state can only be dropped once the data has been 
moved to the target system,
-        * which may sometimes take longer than one checkpoint (if the target 
system is temporarily unable
-        * to keep up).
-        *
-        * <p>The fact that the job manager does not own the life cycle of this 
type of state means also
-        * that it is strictly the responsibility of the tasks to handle the 
cleanup of this data.
-        *
-        * <p>Developer note: In the future, we may be able to make this a 
special case of "shared state",
-        * where the task re-emits the shared state reference as long as it 
needs to hold onto the
-        * persisted state data.
-        *
-        * @return A checkpoint state stream to the location for state owned by 
tasks.
-        * @throws IOException Thrown, if the stream cannot be opened.
-        */
-       CheckpointStateOutputStream createTaskOwnedStateStream() throws 
IOException;
+               long checkpointId,
+               @Nullable String externalLocationPointer) throws IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
new file mode 100644
index 0000000..dead842
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.IOException;
+
+/**
+ * This interface implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation}
+ * which created by {@link CheckpointStorageCoordinatorView}.
+ *
+ * <p>Methods of this interface act as a worker role in task manager.
+ */
+public interface CheckpointStorageWorkerView {
+
+       /**
+        * Resolves a storage location reference into a CheckpointStreamFactory.
+        *
+        * <p>The reference may be the {@link 
CheckpointStorageLocationReference#isDefaultReference() default reference},
+        * in which case the method should return the default location, taking 
existing configuration
+        * and checkpoint ID into account.
+        *
+        * @param checkpointId The ID of the checkpoint that the location is 
initialized for.
+        * @param reference The checkpoint location reference.
+        *
+        * @return A checkpoint storage location reflecting the reference and 
checkpoint ID.
+        *
+        * @throws IOException Thrown, if the storage location cannot be 
initialized from the reference.
+        */
+       CheckpointStreamFactory resolveCheckpointStorageLocation(
+               long checkpointId,
+               CheckpointStorageLocationReference reference) throws 
IOException;
+
+       /**
+        * Opens a stream to persist checkpoint state data that is owned 
strictly by tasks and
+        * not attached to the life cycle of a specific checkpoint.
+        *
+        * <p>This method should be used when the persisted data cannot be 
immediately dropped once
+        * the checkpoint that created it is dropped. Examples are 
write-ahead-logs.
+        * For those, the state can only be dropped once the data has been 
moved to the target system,
+        * which may sometimes take longer than one checkpoint (if the target 
system is temporarily unable
+        * to keep up).
+        *
+        * <p>The fact that the job manager does not own the life cycle of this 
type of state means also
+        * that it is strictly the responsibility of the tasks to handle the 
cleanup of this data.
+        *
+        * <p>Developer note: In the future, we may be able to make this a 
special case of "shared state",
+        * where the task re-emits the shared state reference as long as it 
needs to hold onto the
+        * persisted state data.
+        *
+        * @return A checkpoint state stream to the location for state owned by 
tasks.
+        * @throws IOException Thrown, if the stream cannot be opened.
+        */
+       CheckpointStreamFactory.CheckpointStateOutputStream 
createTaskOwnedStateStream() throws IOException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index 1549e01..7c88d0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -151,7 +151,7 @@ public class FsCheckpointStorage extends 
AbstractFsCheckpointStorage {
        }
 
        @Override
-       public CheckpointStateOutputStream createTaskOwnedStateStream() throws 
IOException {
+       public CheckpointStateOutputStream createTaskOwnedStateStream() {
                return new FsCheckpointStateOutputStream(
                                taskOwnedStateDirectory,
                                fileSystem,
@@ -160,7 +160,7 @@ public class FsCheckpointStorage extends 
AbstractFsCheckpointStorage {
        }
 
        @Override
-       protected CheckpointStorageLocation createSavepointLocation(FileSystem 
fs, Path location) throws IOException {
+       protected CheckpointStorageLocation createSavepointLocation(FileSystem 
fs, Path location) {
                final CheckpointStorageLocationReference reference = 
encodePathAsReference(location);
                return new FsCheckpointStorageLocation(fs, location, location, 
location, reference, fileSizeThreshold);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
index 6f0ca4c..41a0359 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
@@ -133,7 +133,7 @@ public class MemoryBackendCheckpointStorage extends 
AbstractFsCheckpointStorage
        @Override
        public CheckpointStreamFactory resolveCheckpointStorageLocation(
                        long checkpointId,
-                       CheckpointStorageLocationReference reference) throws 
IOException {
+                       CheckpointStorageLocationReference reference) {
 
                // no matter where the checkpoint goes, we always return the 
storage location that stores
                // state inline with the state handles.
@@ -141,12 +141,12 @@ public class MemoryBackendCheckpointStorage extends 
AbstractFsCheckpointStorage
        }
 
        @Override
-       public CheckpointStateOutputStream createTaskOwnedStateStream() throws 
IOException {
+       public CheckpointStateOutputStream createTaskOwnedStateStream() {
                return new MemoryCheckpointOutputStream(maxStateSize);
        }
 
        @Override
-       protected CheckpointStorageLocation createSavepointLocation(FileSystem 
fs, Path location) throws IOException {
+       protected CheckpointStorageLocation createSavepointLocation(FileSystem 
fs, Path location) {
                return new PersistentMetadataCheckpointStorageLocation(fs, 
location, maxStateSize);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index f0d9ce7..889160c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -56,7 +56,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
-import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StateBackend;
@@ -466,7 +466,7 @@ public class DispatcherTest extends TestLogger {
        @Nonnull
        private URI createTestingSavepoint() throws IOException, 
URISyntaxException {
                final StateBackend stateBackend = 
Checkpoints.loadStateBackend(configuration, 
Thread.currentThread().getContextClassLoader(), log);
-               final CheckpointStorage checkpointStorage = 
stateBackend.createCheckpointStorage(jobGraph.getJobID());
+               final CheckpointStorageCoordinatorView checkpointStorage = 
stateBackend.createCheckpointStorage(jobGraph.getJobID());
                final File savepointFile = temporaryFolder.newFolder();
                final long checkpointId = 1L;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 08cc6ac..462df70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -139,20 +139,20 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        public final ExpectedException expectedException = 
ExpectedException.none();
 
        // lazily initialized stream storage
-       private CheckpointStorageLocation checkpointStorageLocation;
+       private CheckpointStreamFactory checkpointStreamFactory;
 
        protected abstract B getStateBackend() throws Exception;
 
        protected abstract boolean isSerializerPresenceRequiredOnRestore();
 
        protected CheckpointStreamFactory createStreamFactory() throws 
Exception {
-               if (checkpointStorageLocation == null) {
-                       checkpointStorageLocation = getStateBackend()
+               if (checkpointStreamFactory == null) {
+                       checkpointStreamFactory = getStateBackend()
                                        .createCheckpointStorage(new JobID())
-                                       .initializeLocationForCheckpoint(1L);
+                                       .resolveCheckpointStorageLocation(1L, 
CheckpointStorageLocationReference.getDefault());
                }
 
-               return checkpointStorageLocation;
+               return checkpointStreamFactory;
        }
 
        protected <K> AbstractKeyedStateBackend<K> 
createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
index 3f42952..df28514 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
@@ -61,7 +61,7 @@ public class BackendForTestStream extends MemoryStateBackend {
        }
 
        @Override
-       public CheckpointStorage createCheckpointStorage(JobID jobId) throws 
IOException {
+       public CheckpointStorage createCheckpointStorage(JobID jobId) {
                return new TestCheckpointStorage();
        }
 
@@ -85,27 +85,27 @@ public class BackendForTestStream extends 
MemoryStateBackend {
                }
 
                @Override
-               public CompletedCheckpointStorageLocation 
resolveCheckpoint(String pointer) throws IOException {
+               public CompletedCheckpointStorageLocation 
resolveCheckpoint(String pointer) {
                        throw new UnsupportedOperationException();
                }
 
                @Override
-               public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpointId) throws IOException {
+               public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpointId) {
                        throw new UnsupportedOperationException();
                }
 
                @Override
-               public CheckpointStorageLocation 
initializeLocationForSavepoint(long checkpointId, @Nullable String 
externalLocationPointer) throws IOException {
+               public CheckpointStorageLocation 
initializeLocationForSavepoint(long checkpointId, @Nullable String 
externalLocationPointer) {
                        throw new UnsupportedOperationException();
                }
 
                @Override
-               public CheckpointStreamFactory 
resolveCheckpointStorageLocation(long checkpointId, 
CheckpointStorageLocationReference reference) throws IOException {
+               public CheckpointStreamFactory 
resolveCheckpointStorageLocation(long checkpointId, 
CheckpointStorageLocationReference reference) {
                        return streamFactory;
                }
 
                @Override
-               public CheckpointStateOutputStream createTaskOwnedStateStream() 
throws IOException {
+               public CheckpointStateOutputStream createTaskOwnedStateStream() 
{
                        throw new UnsupportedOperationException();
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
index 887dedf..3a54242 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -51,7 +51,8 @@ public abstract class StateBackendTestContext {
        public static final int NUMBER_OF_KEY_GROUPS = 10;
 
        private final StateBackend stateBackend;
-       private final CheckpointStorageLocation checkpointStorageLocation;
+       private final CheckpointOptions checkpointOptions;
+       private final CheckpointStreamFactory checkpointStreamFactory;
        private final TtlTimeProvider timeProvider;
        private final SharedStateRegistry sharedStateRegistry;
        private final List<KeyedStateHandle> snapshots;
@@ -61,18 +62,19 @@ public abstract class StateBackendTestContext {
        protected StateBackendTestContext(TtlTimeProvider timeProvider) {
                this.timeProvider = Preconditions.checkNotNull(timeProvider);
                this.stateBackend = 
Preconditions.checkNotNull(createStateBackend());
-               this.checkpointStorageLocation = 
createCheckpointStorageLocation();
+               this.checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation();
+               this.checkpointStreamFactory = createCheckpointStreamFactory();
                this.sharedStateRegistry = new SharedStateRegistry();
                this.snapshots = new ArrayList<>();
        }
 
        protected abstract StateBackend createStateBackend();
 
-       private CheckpointStorageLocation createCheckpointStorageLocation() {
+       private CheckpointStreamFactory createCheckpointStreamFactory() {
                try {
                        return stateBackend
                                .createCheckpointStorage(new JobID())
-                               .initializeLocationForCheckpoint(2L);
+                               .resolveCheckpointStorageLocation(2L, 
checkpointOptions.getTargetLocation());
                } catch (IOException e) {
                        throw new RuntimeException("unexpected");
                }
@@ -139,7 +141,7 @@ public abstract class StateBackendTestContext {
        RunnableFuture<SnapshotResult<KeyedStateHandle>> triggerSnapshot() 
throws Exception {
                RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshotRunnableFuture =
                        keyedStateBackend.snapshot(682375462392L, 10L,
-                               checkpointStorageLocation, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+                               checkpointStreamFactory, checkpointOptions);
                if (!snapshotRunnableFuture.isDone()) {
                        snapshotRunnableFuture.run();
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 27db126..4ad0fc6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
-import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -66,7 +66,7 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
        protected final TypeSerializer<IN> serializer;
 
        private transient CheckpointStreamFactory.CheckpointStateOutputStream 
out;
-       private transient CheckpointStorage checkpointStorage;
+       private transient CheckpointStorageWorkerView checkpointStorage;
 
        private transient ListState<PendingCheckpoint> checkpointedState;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6d5ebea..40e31dd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
@@ -144,7 +144,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        protected StateBackend stateBackend;
 
        /** The external storage where checkpoint data is persisted. */
-       private CheckpointStorage checkpointStorage;
+       private CheckpointStorageWorkerView checkpointStorage;
 
        /**
         * The internal {@link ProcessingTimeService} used to define the current
@@ -529,7 +529,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                return lock;
        }
 
-       public CheckpointStorage getCheckpointStorage() {
+       public CheckpointStorageWorkerView getCheckpointStorage() {
                return checkpointStorage;
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index 45c80da..230c68a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
@@ -44,7 +44,7 @@ public class MockStreamTask extends StreamTask {
        private StreamTaskStateInitializer streamTaskStateInitializer;
        private final CloseableRegistry closableRegistry;
        private final StreamStatusMaintainer streamStatusMaintainer;
-       private final CheckpointStorage checkpointStorage;
+       private final CheckpointStorageWorkerView checkpointStorage;
        private final ProcessingTimeService processingTimeService;
        private final BiConsumer<String, Throwable> handleAsyncException;
        private final Map<String, Accumulator<?, ?>> accumulatorMap;
@@ -58,7 +58,7 @@ public class MockStreamTask extends StreamTask {
                StreamTaskStateInitializer streamTaskStateInitializer,
                CloseableRegistry closableRegistry,
                StreamStatusMaintainer streamStatusMaintainer,
-               CheckpointStorage checkpointStorage,
+               CheckpointStorageWorkerView checkpointStorage,
                ProcessingTimeService processingTimeService,
                BiConsumer<String, Throwable> handleAsyncException,
                Map<String, Accumulator<?, ?>> accumulatorMap
@@ -134,7 +134,7 @@ public class MockStreamTask extends StreamTask {
        }
 
        @Override
-       public CheckpointStorage getCheckpointStorage() {
+       public CheckpointStorageWorkerView getCheckpointStorage() {
                return checkpointStorage;
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
index b5cdaf2..4353d3a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -50,7 +51,7 @@ public class MockStreamTaskBuilder {
        private ExecutionConfig executionConfig = new ExecutionConfig();
        private CloseableRegistry closableRegistry = new CloseableRegistry();
        private StreamStatusMaintainer streamStatusMaintainer = new 
MockStreamStatusMaintainer();
-       private CheckpointStorage checkpointStorage;
+       private CheckpointStorageWorkerView checkpointStorage;
        private ProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
        private StreamTaskStateInitializer streamTaskStateInitializer;
        private BiConsumer<String, Throwable> handleAsyncException = (message, 
throwable) -> { };

Reply via email to