This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80bd85d50ec8d3939ab8e826fd7b5dd472516b9f
Author: sjwiesman <sjwies...@gmail.com>
AuthorDate: Mon Aug 16 10:45:48 2021 -0500

    [FLINK-23728][state-processor-api] State bootstrapping fails on new state 
backend factory stack
    
    This closes #16849
---
 .../flink/state/api/output/SnapshotUtils.java      | 31 ++++++++++++++++++----
 .../operators/BroadcastStateBootstrapOperator.java |  2 +-
 .../operators/KeyedStateBootstrapOperator.java     |  2 +-
 .../output/operators/StateBootstrapOperator.java   |  2 +-
 .../operators/StateBootstrapWrapperOperator.java   |  2 +-
 .../flink/state/api/SavepointWriterITCase.java     | 13 +++++++++
 .../state/api/SavepointWriterWindowITCase.java     |  6 ++++-
 .../flink/state/api/output/SnapshotUtilsTest.java  |  9 ++-----
 8 files changed, 50 insertions(+), 17 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
index 9a1867f..e149607 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
@@ -19,16 +19,23 @@
 package org.apache.flink.state.api.output;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.MathUtils;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
+import static 
org.apache.flink.configuration.CheckpointingOptions.FS_WRITE_BUFFER_SIZE;
 
 /** Takes a final snapshot of the state of an operator subtask. */
 @Internal
@@ -43,7 +50,7 @@ public final class SnapshotUtils {
             long timestamp,
             boolean isExactlyOnceMode,
             boolean isUnalignedCheckpoint,
-            CheckpointStorageWorkerView checkpointStorage,
+            Configuration configuration,
             Path savepointPath)
             throws Exception {
 
@@ -57,9 +64,7 @@ public final class SnapshotUtils {
 
         operator.prepareSnapshotPreBarrier(CHECKPOINT_ID);
 
-        CheckpointStreamFactory storage =
-                checkpointStorage.resolveCheckpointStorageLocation(
-                        CHECKPOINT_ID, options.getTargetLocation());
+        CheckpointStreamFactory storage = createStreamFactory(configuration, 
options);
 
         OperatorSnapshotFutures snapshotInProgress =
                 operator.snapshotState(CHECKPOINT_ID, timestamp, options, 
storage);
@@ -70,4 +75,20 @@ public final class SnapshotUtils {
         operator.notifyCheckpointComplete(CHECKPOINT_ID);
         return new TaggedOperatorSubtaskState(index, state);
     }
+
+    private static CheckpointStreamFactory createStreamFactory(
+            Configuration configuration, CheckpointOptions options) throws 
IOException {
+        final Path path =
+                AbstractFsCheckpointStorageAccess.decodePathFromReference(
+                        options.getTargetLocation());
+
+        return new FsCheckpointStorageLocation(
+                path.getFileSystem(),
+                path,
+                path,
+                path,
+                options.getTargetLocation(),
+                
MathUtils.checkedDownCast(configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes()),
+                configuration.get(FS_WRITE_BUFFER_SIZE));
+    }
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
index 38c37ea..60ab7ff 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
@@ -78,7 +78,7 @@ public class BroadcastStateBootstrapOperator<IN>
                         timestamp,
                         
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
                         
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
-                        getContainingTask().getCheckpointStorage(),
+                        
getContainingTask().getConfiguration().getConfiguration(),
                         savepointPath);
 
         output.collect(new StreamRecord<>(state));
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
index 5e9f91d..1817304 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
@@ -93,7 +93,7 @@ public class KeyedStateBootstrapOperator<K, IN>
                         timestamp,
                         
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
                         
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
-                        getContainingTask().getCheckpointStorage(),
+                        
getContainingTask().getConfiguration().getConfiguration(),
                         savepointPath);
 
         output.collect(new StreamRecord<>(state));
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
index ab6f112..08dace2 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
@@ -74,7 +74,7 @@ public class StateBootstrapOperator<IN>
                         timestamp,
                         
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
                         
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
-                        getContainingTask().getCheckpointStorage(),
+                        
getContainingTask().getConfiguration().getConfiguration(),
                         savepointPath);
 
         output.collect(new StreamRecord<>(state));
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
index a72fcdc..933602a 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
@@ -196,7 +196,7 @@ public final class StateBootstrapWrapperOperator<
                         operator.getContainingTask()
                                 .getConfiguration()
                                 .isUnalignedCheckpointsEnabled(),
-                        operator.getContainingTask().getCheckpointStorage(),
+                        
operator.getContainingTask().getConfiguration().getConfiguration(),
                         savepointPath);
 
         output.collect(new StreamRecord<>(state));
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index d13ba03..e35e409 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -37,6 +38,7 @@ import 
org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
 import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
 import org.apache.flink.state.api.functions.StateBootstrapFunction;
@@ -100,6 +102,17 @@ public class SavepointWriterITCase extends 
AbstractTestBase {
         testStateBootstrapAndModification(backend);
     }
 
+    @Test
+    public void testHashMapStateBackend() throws Exception {
+        testStateBootstrapAndModification(new HashMapStateBackend());
+    }
+
+    @Test
+    public void testEmbeddedRocksDBStateBackend() throws Exception {
+        StateBackend backend = new EmbeddedRocksDBStateBackend();
+        testStateBootstrapAndModification(backend);
+    }
+
     public void testStateBootstrapAndModification(StateBackend backend) throws 
Exception {
         final String savepointPath = getTempDirPath(new 
AbstractID().toHexString());
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
index 9a1d774..7a93c12 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
@@ -28,10 +28,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.state.api.utils.MaxWatermarkSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -111,7 +113,9 @@ public class SavepointWriterWindowITCase extends 
AbstractTestBase {
 
     private static final List<Tuple2<String, StateBackend>> STATE_BACKENDS =
             Arrays.asList(
-                    Tuple2.of("MemoryStateBackend", new MemoryStateBackend()),
+                    Tuple2.of("HashMap", new HashMapStateBackend()),
+                    Tuple2.of("EmbeddedRocksDB", new 
EmbeddedRocksDBStateBackend()),
+                    Tuple2.of("Memory", new MemoryStateBackend()),
                     Tuple2.of(
                             "RocksDB",
                             new RocksDBStateBackend((StateBackend) new 
MemoryStateBackend())));
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
index f2056f3..773f80e 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.state.api.output;
 
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ttl.mock.MockCheckpointStorage;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
@@ -55,12 +53,9 @@ public class SnapshotUtilsTest {
     @Test
     public void testSnapshotUtilsLifecycle() throws Exception {
         StreamOperator<Void> operator = new LifecycleOperator();
-        CheckpointStorageWorkerView storage =
-                new MockCheckpointStorage().createCheckpointStorage(new 
JobID());
-
         Path path = new Path(folder.newFolder().getAbsolutePath());
 
-        SnapshotUtils.snapshot(operator, 0, 0L, true, false, storage, path);
+        SnapshotUtils.snapshot(operator, 0, 0L, true, false, new 
Configuration(), path);
 
         Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, 
ACTUAL_ORDER_TRACKING);
     }

Reply via email to