This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 80bd85d50ec8d3939ab8e826fd7b5dd472516b9f Author: sjwiesman <sjwies...@gmail.com> AuthorDate: Mon Aug 16 10:45:48 2021 -0500 [FLINK-23728][state-processor-api] State bootstrapping fails on new state backend factory stack This closes #16849 --- .../flink/state/api/output/SnapshotUtils.java | 31 ++++++++++++++++++---- .../operators/BroadcastStateBootstrapOperator.java | 2 +- .../operators/KeyedStateBootstrapOperator.java | 2 +- .../output/operators/StateBootstrapOperator.java | 2 +- .../operators/StateBootstrapWrapperOperator.java | 2 +- .../flink/state/api/SavepointWriterITCase.java | 13 +++++++++ .../state/api/SavepointWriterWindowITCase.java | 6 ++++- .../flink/state/api/output/SnapshotUtilsTest.java | 9 ++----- 8 files changed, 50 insertions(+), 17 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java index 9a1867f..e149607 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java @@ -19,16 +19,23 @@ package org.apache.flink.state.api.output; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation; import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.util.MathUtils; + +import java.io.IOException; + +import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD; +import static org.apache.flink.configuration.CheckpointingOptions.FS_WRITE_BUFFER_SIZE; /** Takes a final snapshot of the state of an operator subtask. */ @Internal @@ -43,7 +50,7 @@ public final class SnapshotUtils { long timestamp, boolean isExactlyOnceMode, boolean isUnalignedCheckpoint, - CheckpointStorageWorkerView checkpointStorage, + Configuration configuration, Path savepointPath) throws Exception { @@ -57,9 +64,7 @@ public final class SnapshotUtils { operator.prepareSnapshotPreBarrier(CHECKPOINT_ID); - CheckpointStreamFactory storage = - checkpointStorage.resolveCheckpointStorageLocation( - CHECKPOINT_ID, options.getTargetLocation()); + CheckpointStreamFactory storage = createStreamFactory(configuration, options); OperatorSnapshotFutures snapshotInProgress = operator.snapshotState(CHECKPOINT_ID, timestamp, options, storage); @@ -70,4 +75,20 @@ public final class SnapshotUtils { operator.notifyCheckpointComplete(CHECKPOINT_ID); return new TaggedOperatorSubtaskState(index, state); } + + private static CheckpointStreamFactory createStreamFactory( + Configuration configuration, CheckpointOptions options) throws IOException { + final Path path = + AbstractFsCheckpointStorageAccess.decodePathFromReference( + options.getTargetLocation()); + + return new FsCheckpointStorageLocation( + path.getFileSystem(), + path, + path, + path, + options.getTargetLocation(), + MathUtils.checkedDownCast(configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes()), + configuration.get(FS_WRITE_BUFFER_SIZE)); + } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java index 38c37ea..60ab7ff 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java @@ -78,7 +78,7 @@ public class BroadcastStateBootstrapOperator<IN> timestamp, getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(), getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(), - getContainingTask().getCheckpointStorage(), + getContainingTask().getConfiguration().getConfiguration(), savepointPath); output.collect(new StreamRecord<>(state)); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java index 5e9f91d..1817304 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java @@ -93,7 +93,7 @@ public class KeyedStateBootstrapOperator<K, IN> timestamp, getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(), getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(), - getContainingTask().getCheckpointStorage(), + getContainingTask().getConfiguration().getConfiguration(), savepointPath); output.collect(new StreamRecord<>(state)); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java index ab6f112..08dace2 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java @@ -74,7 +74,7 @@ public class StateBootstrapOperator<IN> timestamp, getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(), getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(), - getContainingTask().getCheckpointStorage(), + getContainingTask().getConfiguration().getConfiguration(), savepointPath); output.collect(new StreamRecord<>(state)); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java index a72fcdc..933602a 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java @@ -196,7 +196,7 @@ public final class StateBootstrapWrapperOperator< operator.getContainingTask() .getConfiguration() .isUnalignedCheckpointsEnabled(), - operator.getContainingTask().getCheckpointStorage(), + operator.getContainingTask().getConfiguration().getConfiguration(), savepointPath); output.collect(new StreamRecord<>(state)); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java index d13ba03..e35e409 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -37,6 +38,7 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction; import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction; import org.apache.flink.state.api.functions.StateBootstrapFunction; @@ -100,6 +102,17 @@ public class SavepointWriterITCase extends AbstractTestBase { testStateBootstrapAndModification(backend); } + @Test + public void testHashMapStateBackend() throws Exception { + testStateBootstrapAndModification(new HashMapStateBackend()); + } + + @Test + public void testEmbeddedRocksDBStateBackend() throws Exception { + StateBackend backend = new EmbeddedRocksDBStateBackend(); + testStateBootstrapAndModification(backend); + } + public void testStateBootstrapAndModification(StateBackend backend) throws Exception { final String savepointPath = getTempDirPath(new AbstractID().toHexString()); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java index 9a1d774..7a93c12 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java @@ -28,10 +28,12 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.state.api.utils.MaxWatermarkSource; import org.apache.flink.streaming.api.datastream.DataStream; @@ -111,7 +113,9 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { private static final List<Tuple2<String, StateBackend>> STATE_BACKENDS = Arrays.asList( - Tuple2.of("MemoryStateBackend", new MemoryStateBackend()), + Tuple2.of("HashMap", new HashMapStateBackend()), + Tuple2.of("EmbeddedRocksDB", new EmbeddedRocksDBStateBackend()), + Tuple2.of("Memory", new MemoryStateBackend()), Tuple2.of( "RocksDB", new RocksDBStateBackend((StateBackend) new MemoryStateBackend()))); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java index f2056f3..773f80e 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java @@ -18,14 +18,12 @@ package org.apache.flink.state.api.output; -import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.ttl.mock.MockCheckpointStorage; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; @@ -55,12 +53,9 @@ public class SnapshotUtilsTest { @Test public void testSnapshotUtilsLifecycle() throws Exception { StreamOperator<Void> operator = new LifecycleOperator(); - CheckpointStorageWorkerView storage = - new MockCheckpointStorage().createCheckpointStorage(new JobID()); - Path path = new Path(folder.newFolder().getAbsolutePath()); - SnapshotUtils.snapshot(operator, 0, 0L, true, false, storage, path); + SnapshotUtils.snapshot(operator, 0, 0L, true, false, new Configuration(), path); Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING); }