This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0b9f8956a1452a8b1259257af926eac7ad9e3bec Author: Sebastian Mattheis <sebast...@ververica.com> AuthorDate: Fri Feb 18 10:17:49 2022 +0100 [FLINK-26146] Add tests of flink version upgrades to cover native snapshots Turned SavepointMigrationTestBase into SnapshotMigrationTestBase that supports testing canconical savepoints, native savepoints, and checkpoints; and adapted sub classes accordingly. --- .../LegacyStatefulJobSavepointMigrationITCase.java | 8 +- ...ava => StatefulJobSnapshotMigrationITCase.java} | 145 ++++++++------- .../StatefulJobWBroadcastStateMigrationITCase.java | 139 +++++++------- .../checkpointing/utils/MigrationTestUtils.java | 4 +- ...estBase.java => SnapshotMigrationTestBase.java} | 200 +++++++++++++++++++-- .../TypeSerializerSnapshotMigrationITCase.java | 155 ++++++++-------- pom.xml | 4 +- 7 files changed, 430 insertions(+), 225 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java similarity index 99% rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java index e59be25..fe16a25 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.test.checkpointing.utils; +package org.apache.flink.test.checkpointing; import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.accumulators.IntCounter; @@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase; import org.apache.flink.util.Collector; import org.junit.Ignore; @@ -62,7 +63,7 @@ import static org.junit.Assert.assertEquals; * previous Flink versions, as well as for different state backends. */ @RunWith(Parameterized.class) -public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase { +public class LegacyStatefulJobSavepointMigrationITCase extends SnapshotMigrationTestBase { private static final int NUM_SOURCE_ELEMENTS = 4; @@ -143,11 +144,12 @@ public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigratio .uid("TimelyStatefulOperator") .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>()); - executeAndSavepoint( + executeAndSnapshot( env, "src/test/resources/" + getSavepointPath( flinkGenerateSavepointVersion, flinkGenerateSavepointBackendType), + SnapshotType.SAVEPOINT_CANONICAL, new Tuple2<>( AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java similarity index 76% rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java index 260c1f2..df2a20b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.test.checkpointing.utils; +package org.apache.flink.test.checkpointing; import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.accumulators.IntCounter; @@ -28,8 +28,9 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -40,14 +41,17 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.test.checkpointing.utils.MigrationTestUtils; +import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase; import org.apache.flink.util.Collector; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.util.Arrays; import java.util.Collection; +import java.util.LinkedList; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; @@ -56,59 +60,69 @@ import static org.junit.Assert.assertEquals; * previous Flink versions, as well as for different state backends. */ @RunWith(Parameterized.class) -public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase { +public class StatefulJobSnapshotMigrationITCase extends SnapshotMigrationTestBase { private static final int NUM_SOURCE_ELEMENTS = 4; - /** - * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. we - * have to run the checkpointing functions 2) we want to verify restoring, so we have to run the - * checking functions. - */ - public enum ExecutionMode { - PERFORM_SAVEPOINT, - VERIFY_SAVEPOINT - } + // TODO increase this to newer version to create and test snapshot migration for newer versions + private static final FlinkVersion currentVersion = FlinkVersion.v1_14; - // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints - // TODO Note: You should generate the savepoint based on the release branch instead of the + // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots + // TODO Note: You should generate the snapshot based on the release branch instead of the // master. - private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT; - - @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") - public static Collection<Tuple2<FlinkVersion, String>> parameters() { - return Arrays.asList( - Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT; + + @Parameterized.Parameters(name = "Test snapshot: {0}") + public static Collection<SnapshotSpec> parameters() { + Collection<SnapshotSpec> parameters = new LinkedList<>(); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.MEMORY_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_4, FlinkVersion.v1_14))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_4, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_NATIVE, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_NATIVE, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.CHECKPOINT, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.CHECKPOINT, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { + parameters = + parameters.stream() + .filter(x -> x.getFlinkVersion().equals(currentVersion)) + .collect(Collectors.toList()); + } + return parameters; } - private final FlinkVersion testMigrateVersion; - private final String testStateBackend; + private final SnapshotSpec snapshotSpec; - public StatefulJobSavepointMigrationITCase( - Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception { - this.testMigrateVersion = testMigrateVersionAndBackend.f0; - this.testStateBackend = testMigrateVersionAndBackend.f1; + public StatefulJobSnapshotMigrationITCase(SnapshotSpec snapshotSpec) throws Exception { + this.snapshotSpec = snapshotSpec; } @Test @@ -119,13 +133,16 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart()); - switch (testStateBackend) { + switch (snapshotSpec.getStateBackendType()) { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: - env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + env.setStateBackend(new EmbeddedRocksDBStateBackend()); break; case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: env.setStateBackend(new MemoryStateBackend()); break; + case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME: + env.setStateBackend(new HashMapStateBackend()); + break; default: throw new UnsupportedOperationException(); } @@ -140,7 +157,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap; OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator; - if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) { + if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState( NUM_SOURCE_ELEMENTS); @@ -149,7 +166,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB NUM_SOURCE_ELEMENTS); flatMap = new CheckpointingKeyedStateFlatMap(); timelyOperator = new CheckpointingTimelyStatefulOperator(); - } else if (executionMode == ExecutionMode.VERIFY_SAVEPOINT) { + } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) { nonParallelSource = new MigrationTestUtils.CheckingNonParallelSourceWithListState( NUM_SOURCE_ELEMENTS); @@ -190,17 +207,20 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB .uid("CheckpointingTimelyStatefulOperator2") .addSink(new MigrationTestUtils.AccumulatorCountingSink<>()); - if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) { - executeAndSavepoint( + final String snapshotPath = getSnapshotPath(snapshotSpec); + + if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { + executeAndSnapshot( env, - "src/test/resources/" + getSavepointPath(testMigrateVersion, testStateBackend), + "src/test/resources/" + snapshotPath, + snapshotSpec.getSnapshotType(), new Tuple2<>( MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2)); } else { restoreAndExecute( env, - getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)), + getResourceFilename(snapshotPath), new Tuple2<>( MigrationTestUtils.CheckingNonParallelSourceWithListState .SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, @@ -228,17 +248,8 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB } } - private String getSavepointPath(FlinkVersion savepointVersion, String backendType) { - switch (backendType) { - case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: - return "new-stateful-udf-migration-itcase-flink" - + savepointVersion - + "-rocksdb-savepoint"; - case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: - return "new-stateful-udf-migration-itcase-flink" + savepointVersion + "-savepoint"; - default: - throw new UnsupportedOperationException(); - } + private static String getSnapshotPath(SnapshotSpec snapshotSpec) { + return "new-stateful-udf-migration-itcase-" + snapshotSpec; } private static class CheckpointingKeyedStateFlatMap diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java similarity index 80% rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java index 4470b85..459ce1b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.test.checkpointing.utils; +package org.apache.flink.test.checkpointing; import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -25,14 +25,17 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.checkpointing.utils.MigrationTestUtils; +import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase; import org.apache.flink.util.Collector; import org.junit.Assert; @@ -40,10 +43,11 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; +import java.util.stream.Collectors; /** * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to @@ -51,48 +55,69 @@ import java.util.Map; * state backends. */ @RunWith(Parameterized.class) -public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { +public class StatefulJobWBroadcastStateMigrationITCase extends SnapshotMigrationTestBase { private static final int NUM_SOURCE_ELEMENTS = 4; - // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints - // TODO Note: You should generate the savepoint based on the release branch instead of the + // TODO increase this to newer version to create and test snapshot migration for newer versions + private static final FlinkVersion currentVersion = FlinkVersion.v1_14; + + // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots + // TODO Note: You should generate the snapshot based on the release branch instead of the // master. - private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = - StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; - - @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") - public static Collection<Tuple2<FlinkVersion, String>> parameters() { - return Arrays.asList( - Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT; + + @Parameterized.Parameters(name = "Test snapshot: {0}") + public static Collection<SnapshotSpec> parameters() { + Collection<SnapshotSpec> parameters = new LinkedList<>(); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.MEMORY_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_5, FlinkVersion.v1_14))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_5, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_NATIVE, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_NATIVE, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.CHECKPOINT, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.CHECKPOINT, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { + parameters = + parameters.stream() + .filter(x -> x.getFlinkVersion().equals(currentVersion)) + .collect(Collectors.toList()); + } + return parameters; } - private final FlinkVersion testMigrateVersion; - private final String testStateBackend; + private final SnapshotSpec snapshotSpec; - public StatefulJobWBroadcastStateMigrationITCase( - Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception { - this.testMigrateVersion = testMigrateVersionAndBackend.f0; - this.testStateBackend = testMigrateVersionAndBackend.f1; + public StatefulJobWBroadcastStateMigrationITCase(SnapshotSpec snapshotSpec) throws Exception { + this.snapshotSpec = snapshotSpec; } @Test @@ -103,13 +128,16 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart()); - switch (testStateBackend) { + switch (snapshotSpec.getStateBackendType()) { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: - env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + env.setStateBackend(new EmbeddedRocksDBStateBackend()); break; case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: env.setStateBackend(new MemoryStateBackend()); break; + case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME: + env.setStateBackend(new HashMapStateBackend()); + break; default: throw new UnsupportedOperationException(); } @@ -148,7 +176,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio expectedThirdState.put(2L, "2"); expectedThirdState.put(3L, "3"); - if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) { + if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState( NUM_SOURCE_ELEMENTS); @@ -163,8 +191,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio NUM_SOURCE_ELEMENTS); firstBroadcastFunction = new CheckpointingKeyedBroadcastFunction(); secondBroadcastFunction = new CheckpointingKeyedSingleBroadcastFunction(); - } else if (executionMode - == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) { + } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) { nonParallelSource = new MigrationTestUtils.CheckingNonParallelSourceWithListState( NUM_SOURCE_ELEMENTS); @@ -252,19 +279,18 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio .uid("BrProcess2") .addSink(new MigrationTestUtils.AccumulatorCountingSink<>()); - if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) { - executeAndSavepoint( + if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { + executeAndSnapshot( env, - "src/test/resources/" - + getBroadcastSavepointPath(testMigrateVersion, testStateBackend), + "src/test/resources/" + getSnapshotPath(snapshotSpec), + snapshotSpec.getSnapshotType(), new Tuple2<>( MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 2 * NUM_SOURCE_ELEMENTS)); } else { restoreAndExecute( env, - getResourceFilename( - getBroadcastSavepointPath(testMigrateVersion, testStateBackend)), + getResourceFilename(getSnapshotPath(snapshotSpec)), new Tuple2<>( MigrationTestUtils.CheckingNonParallelSourceWithListState .SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, @@ -279,19 +305,8 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio } } - private String getBroadcastSavepointPath(FlinkVersion savepointVersion, String backendType) { - switch (backendType) { - case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: - return "new-stateful-broadcast-udf-migration-itcase-flink" - + savepointVersion - + "-rocksdb-savepoint"; - case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: - return "new-stateful-broadcast-udf-migration-itcase-flink" - + savepointVersion - + "-savepoint"; - default: - throw new UnsupportedOperationException(); - } + private String getSnapshotPath(SnapshotSpec snapshotSpec) { + return "new-stateful-broadcast-udf-migration-itcase-" + snapshotSpec; } /** diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java index 85ff84c..9fd021e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java @@ -110,7 +110,7 @@ public class MigrationTestUtils { private static final long serialVersionUID = 1L; - static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingNonParallelSourceWithListState.class + "_RESTORE_CHECK"; private volatile boolean isRunning = true; @@ -251,7 +251,7 @@ public class MigrationTestUtils { private static final long serialVersionUID = 1L; - static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingParallelSourceWithUnionListState.class + "_RESTORE_CHECK"; private volatile boolean isRunning = true; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java similarity index 53% rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java index f17f59c..57a4894 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.checkpointing.utils; +import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Deadline; @@ -33,6 +34,7 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.StateBackendLoader; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -46,9 +48,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.Serializable; import java.net.URI; import java.net.URL; import java.time.Duration; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -58,18 +64,154 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; /** Test savepoint migration. */ -public abstract class SavepointMigrationTestBase extends TestBaseUtils { + +/** + * Base for testing snapshot migration. The base test supports snapshots types as defined in {@link + * SnapshotType}. + */ +public abstract class SnapshotMigrationTestBase extends TestBaseUtils { @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); @Rule public final MiniClusterWithClientResource miniClusterResource; - private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class); + private static final Logger LOG = LoggerFactory.getLogger(SnapshotMigrationTestBase.class); protected static final int DEFAULT_PARALLELISM = 4; + /** + * Modes for migration test execution. This enum is supposed to serve as a switch between two + * modes of test execution: 1) create snapshots and 2) verify snapshots: + */ + public enum ExecutionMode { + /** Create binary snapshot(s), i.e. run the checkpointing functions. */ + CREATE_SNAPSHOT, + /** Verify snapshot(s), i.e, restore snapshot and check execution result. */ + VERIFY_SNAPSHOT + } + + /** Types of snapshot supported by this base test. */ + public enum SnapshotType { + /** Savepoints with Flink canonical format. */ + SAVEPOINT_CANONICAL, + /** Savepoint with native format of respective state backend. */ + SAVEPOINT_NATIVE, + /** Checkpoint. */ + CHECKPOINT + } + + /** + * A snapshot specification (immutable) for migration tests that consists of {@link + * FlinkVersion} that the snapshot has been created with, {@link + * SnapshotMigrationTestBase.SnapshotType}, and state backend type that the snapshot has been + * ctreated from. + */ + public static class SnapshotSpec implements Serializable { + private final FlinkVersion flinkVersion; + private final String stateBackendType; + private final SnapshotMigrationTestBase.SnapshotType snapshotType; + + /** + * Creates a {@link SnapshotSpec} with specified parameters. + * + * @param flinkVersion Specifies the {@link FlinkVersion}. + * @param stateBackendType Specifies the state backend type. + * @param snapshotType Specifies the {@link SnapshotMigrationTestBase.SnapshotType}. + */ + public SnapshotSpec( + FlinkVersion flinkVersion, + String stateBackendType, + SnapshotMigrationTestBase.SnapshotType snapshotType) { + this.flinkVersion = flinkVersion; + this.stateBackendType = stateBackendType; + this.snapshotType = snapshotType; + } + + /** + * Gets the {@link FlinkVersion} that the snapshot has been created with. + * + * @return {@link FlinkVersion} + */ + public FlinkVersion getFlinkVersion() { + return flinkVersion; + } + + /** + * Gets the state backend type that the snapshot has been created from. + * + * @return State backend type. + */ + public String getStateBackendType() { + return stateBackendType; + } + + /** + * Gets the {@link SnapshotMigrationTestBase.SnapshotType}. + * + * @return {@link SnapshotMigrationTestBase.SnapshotType} + */ + public SnapshotMigrationTestBase.SnapshotType getSnapshotType() { + return snapshotType; + } + + /** + * Creates a collection of {@link SnapshotSpec} for a given collection of {@link + * FlinkVersion} with the same parameters but different {@link FlinkVersion}. + * + * @param stateBackendType Specifies the state backend type. + * @param snapshotType Specifies the snapshot type. + * @param flinkVersions A collection of {@link FlinkVersion}. + * @return A collection of {@link SnapshotSpec} that differ only by means of {@link + * FlinkVersion} FlinkVersion}. + */ + public static Collection<SnapshotSpec> withVersions( + String stateBackendType, + SnapshotMigrationTestBase.SnapshotType snapshotType, + Collection<FlinkVersion> flinkVersions) { + List<SnapshotSpec> snapshotSpecCollection = new LinkedList<>(); + for (FlinkVersion version : flinkVersions) { + snapshotSpecCollection.add( + new SnapshotSpec(version, stateBackendType, snapshotType)); + } + return snapshotSpecCollection; + } + + @Override + public String toString() { + StringBuilder str = new StringBuilder("flink" + flinkVersion); + switch (stateBackendType) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + str.append("-rocksdb"); + break; + case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: + // This is implicit due to backwards compatibility with legacy artifact names. + break; + case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME: + str.append("-hashmap"); + break; + default: + throw new UnsupportedOperationException("State backend type not supported."); + } + switch (snapshotType) { + case SAVEPOINT_CANONICAL: + str.append("-savepoint"); + // Canonical implicit due to backwards compatibility with legacy artifact names. + break; + case SAVEPOINT_NATIVE: + str.append("-savepoint-native"); + break; + case CHECKPOINT: + str.append("-checkpoint"); + break; + default: + throw new UnsupportedOperationException("Snapshot type not supported."); + } + return str.toString(); + } + } + protected static String getResourceFilename(String filename) { - ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader(); + ClassLoader cl = SnapshotMigrationTestBase.class.getClassLoader(); URL resource = cl.getResource(filename); if (resource == null) { throw new NullPointerException("Missing snapshot resource."); @@ -77,7 +219,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils { return resource.getFile(); } - protected SavepointMigrationTestBase() throws Exception { + protected SnapshotMigrationTestBase() throws Exception { miniClusterResource = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() @@ -115,10 +257,22 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils { return config; } + @Deprecated @SafeVarargs protected final void executeAndSavepoint( StreamExecutionEnvironment env, - String savepointPath, + String snapshotPath, + Tuple2<String, Integer>... expectedAccumulators) + throws Exception { + executeAndSnapshot( + env, snapshotPath, SnapshotType.SAVEPOINT_CANONICAL, expectedAccumulators); + } + + @SafeVarargs + protected final void executeAndSnapshot( + StreamExecutionEnvironment env, + String snapshotPath, + SnapshotType snapshotType, Tuple2<String, Integer>... expectedAccumulators) throws Exception { @@ -162,27 +316,41 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils { fail("Did not see the expected accumulator results within time limit."); } - LOG.info("Triggering savepoint."); + LOG.info("Triggering snapshot."); - CompletableFuture<String> savepointPathFuture = - client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL); + CompletableFuture<String> snapshotPathFuture; + switch (snapshotType) { + case SAVEPOINT_CANONICAL: + snapshotPathFuture = + client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL); + break; + case SAVEPOINT_NATIVE: + snapshotPathFuture = + client.triggerSavepoint(jobID, null, SavepointFormatType.NATIVE); + break; + case CHECKPOINT: + snapshotPathFuture = miniClusterResource.getMiniCluster().triggerCheckpoint(jobID); + break; + default: + throw new UnsupportedOperationException("Snapshot type not supported/implemented."); + } - String jobmanagerSavepointPath = - savepointPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + String jobmanagerSnapshotPath = + snapshotPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath()); + File jobManagerSnapshot = new File(new URI(jobmanagerSnapshotPath).getPath()); // savepoints were changed to be directories in Flink 1.3 - if (jobManagerSavepoint.isDirectory()) { - FileUtils.moveDirectory(jobManagerSavepoint, new File(savepointPath)); + if (jobManagerSnapshot.isDirectory()) { + FileUtils.moveDirectory(jobManagerSnapshot, new File(snapshotPath)); } else { - FileUtils.moveFile(jobManagerSavepoint, new File(savepointPath)); + FileUtils.moveFile(jobManagerSnapshot, new File(snapshotPath)); } } @SafeVarargs protected final void restoreAndExecute( StreamExecutionEnvironment env, - String savepointPath, + String snapshotPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception { @@ -193,7 +361,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils { // Submit the job JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); + jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(snapshotPath)); JobID jobID = client.submitJob(jobGraph).get(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java index aa22407..96d607b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java @@ -29,104 +29,117 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.test.checkpointing.utils.MigrationTestUtils; -import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase; +import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; +import java.util.LinkedList; +import java.util.stream.Collectors; /** * Migration IT cases for upgrading a legacy {@link TypeSerializerConfigSnapshot} that is written in * checkpoints to {@link TypeSerializerSnapshot} interface. * - * <p>The savepoints used by this test were written with a serializer snapshot class that extends + * <p>The snapshots used by this test were written with a serializer snapshot class that extends * {@link TypeSerializerConfigSnapshot}, as can be seen in the commented out code at the end of this * class. On restore, we change the snapshot to implement directly a {@link TypeSerializerSnapshot}. */ @RunWith(Parameterized.class) -public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTestBase { +public class TypeSerializerSnapshotMigrationITCase extends SnapshotMigrationTestBase { private static final int NUM_SOURCE_ELEMENTS = 4; - /** - * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. we - * have to run the checkpointing functions 2) we want to verify restoring, so we have to run the - * checking functions. - */ - public enum ExecutionMode { - PERFORM_SAVEPOINT, - VERIFY_SAVEPOINT - } + // TODO increase this to newer version to create and test snapshot migration for newer versions + private static final FlinkVersion currentVersion = FlinkVersion.v1_14; - // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints - // TODO Note: You should generate the savepoint based on the release branch instead of the + // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots + // TODO Note: You should generate the snapshot based on the release branch instead of the // master. - private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT; - - @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") - public static Collection<Tuple2<FlinkVersion, String>> parameters() { - return Arrays.asList( - Tuple2.of(FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT; + + @Parameterized.Parameters(name = "Test snapshot: {0}") + public static Collection<SnapshotSpec> parameters() { + Collection<SnapshotSpec> parameters = new LinkedList<>(); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.MEMORY_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_3, FlinkVersion.v1_14))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_3, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_NATIVE, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_NATIVE, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.CHECKPOINT, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.CHECKPOINT, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))); + if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { + parameters = + parameters.stream() + .filter(x -> x.getFlinkVersion().equals(currentVersion)) + .collect(Collectors.toList()); + } + return parameters; } - private final FlinkVersion testMigrateVersion; - private final String testStateBackend; + private final SnapshotSpec snapshotSpec; - public TypeSerializerSnapshotMigrationITCase( - Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception { - this.testMigrateVersion = testMigrateVersionAndBackend.f0; - this.testStateBackend = testMigrateVersionAndBackend.f1; + public TypeSerializerSnapshotMigrationITCase(SnapshotSpec snapshotSpec) throws Exception { + this.snapshotSpec = snapshotSpec; } @Test - public void testSavepoint() throws Exception { + public void testSnapshot() throws Exception { final int parallelism = 1; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart()); - switch (testStateBackend) { + switch (snapshotSpec.getStateBackendType()) { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: - env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + env.setStateBackend(new EmbeddedRocksDBStateBackend()); break; case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: env.setStateBackend(new MemoryStateBackend()); break; + case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME: + env.setStateBackend(new HashMapStateBackend()); + break; default: throw new UnsupportedOperationException(); } @@ -145,36 +158,30 @@ public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTes .map(new TestMapFunction()) .addSink(new MigrationTestUtils.AccumulatorCountingSink<>()); - if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) { - executeAndSavepoint( + final String snapshotPath = getSnapshotPath(snapshotSpec); + + if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { + executeAndSnapshot( env, - "src/test/resources/" + getSavepointPath(testMigrateVersion, testStateBackend), - new Tuple2<>( + "src/test/resources/" + snapshotPath, + snapshotSpec.getSnapshotType(), + Tuple2.of( MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); - } else { + } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) { restoreAndExecute( env, - getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)), - new Tuple2<>( + getResourceFilename(snapshotPath), + Tuple2.of( MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } else { + throw new IllegalStateException("Unknown ExecutionMode " + executionMode); } } - private String getSavepointPath(FlinkVersion savepointVersion, String backendType) { - switch (backendType) { - case "rocksdb": - return "type-serializer-snapshot-migration-itcase-flink" - + savepointVersion - + "-rocksdb-savepoint"; - case "jobmanager": - return "type-serializer-snapshot-migration-itcase-flink" - + savepointVersion - + "-savepoint"; - default: - throw new UnsupportedOperationException(); - } + private String getSnapshotPath(SnapshotSpec snapshotSpec) { + return "type-serializer-snapshot-migration-itcase-" + snapshotSpec; } private static class TestMapFunction diff --git a/pom.xml b/pom.xml index 14de924..d471a8e 100644 --- a/pom.xml +++ b/pom.xml @@ -1494,7 +1494,9 @@ under the License. <exclude>**/src/test/resources/**/test-data</exclude> <exclude>**/src/test/resources/*-snapshot</exclude> <exclude>**/src/test/resources/*.snapshot</exclude> - <exclude>**/src/test/resources/*-savepoint</exclude> + <exclude>**/src/test/resources/*-savepoint/**</exclude> + <exclude>**/src/test/resources/*-savepoint-native/**</exclude> + <exclude>**/src/test/resources/*-checkpoint/**</exclude> <exclude>flink-core/src/test/resources/serialized-kryo-serializer-1.3</exclude> <exclude>flink-core/src/test/resources/type-without-avro-serialized-using-kryo</exclude> <exclude>flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized</exclude>