This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c88189f0245630b7b16e7e5dd63b8b7fbaecd92a Author: Sebastian Mattheis <sebast...@ververica.com> AuthorDate: Fri Feb 18 10:18:03 2022 +0100 [FLINK-26146] Adapt scala tests to cover native snapshots migration Adapt scala savepoint migration tests to cover native savepoints and checkpoints. This closes #18850 --- .../utils/SnapshotMigrationTestBase.java | 11 --- .../StatefulJobSavepointMigrationITCase.scala | 103 ++++++++++++++------- ...StatefulJobWBroadcastStateMigrationITCase.scala | 101 +++++++++++++------- 3 files changed, 136 insertions(+), 79 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java index 57a4894..2cb6b6a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java @@ -257,17 +257,6 @@ public abstract class SnapshotMigrationTestBase extends TestBaseUtils { return config; } - @Deprecated - @SafeVarargs - protected final void executeAndSavepoint( - StreamExecutionEnvironment env, - String snapshotPath, - Tuple2<String, Integer>... expectedAccumulators) - throws Exception { - executeAndSnapshot( - env, snapshotPath, SnapshotType.SAVEPOINT_CANONICAL, expectedAccumulators); - } - @SafeVarargs protected final void executeAndSnapshot( StreamExecutionEnvironment env, diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala index 3ff91ee..cf88f39 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala @@ -41,7 +41,7 @@ import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum -import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode +import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode, SnapshotSpec, SnapshotType} import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.Test @@ -59,31 +59,54 @@ object StatefulJobSavepointMigrationITCase { // master. val executionMode = ExecutionMode.VERIFY_SNAPSHOT - @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") - def parameters: util.Collection[(FlinkVersion, String)] = { - var parameters = util.Arrays.asList( - // (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - // (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - // (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - // (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - // Note: It is not safe to restore savepoints created in a Scala applications with Flink - // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer - // used names of anonymous classes that depend on the relative position/order in code, e.g., - // if two anonymous classes, instantiated inside the same class and from the same base class, - // change order in the code their names are switched. - // As a consequence, changes in code may result in restore failures. - // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493 - (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME), - (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME), - ) + @Parameterized.Parameters(name = "Test snapshot: {0}") + def parameters: util.Collection[SnapshotSpec] = { + // Note: It is not safe to restore savepoints created in a Scala applications with Flink + // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer + // used names of anonymous classes that depend on the relative position/order in code, e.g., + // if two anonymous classes, instantiated inside the same class and from the same base class, + // change order in the code their names are switched. + // As a consequence, changes in code may result in restore failures. + // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493 + var parameters: util.List[SnapshotSpec] = new util.LinkedList[SnapshotSpec]() + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.MEMORY_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_14, currentVersion))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_NATIVE, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_NATIVE, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.CHECKPOINT, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.CHECKPOINT, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))) if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { - parameters = parameters.stream().filter(x => x._1 == currentVersion) + parameters = parameters.stream() + .filter(x => x.getFlinkVersion().equals(currentVersion)) .collect(Collectors.toList()) } parameters @@ -91,10 +114,20 @@ object StatefulJobSavepointMigrationITCase { val NUM_ELEMENTS = 4 - def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = { - s"stateful-scala-udf-migration-itcase" + - s"-flink${migrationVersionAndBackend._1}" + - s"-${migrationVersionAndBackend._2}-savepoint" + def getSnapshotPath(snapshotSpec: SnapshotSpec): String = { + val path = new StringBuilder(s"stateful-scala-udf-migration-itcase") + path ++= s"-flink${snapshotSpec.getFlinkVersion()}" + path ++= s"-${snapshotSpec.getStateBackendType()}" + snapshotSpec.getSnapshotType() match { + case SnapshotType.SAVEPOINT_CANONICAL => + path ++= "-savepoint" + case SnapshotType.SAVEPOINT_NATIVE => + path ++= "-savepoint-native" + case SnapshotType.CHECKPOINT => + path ++= "-checkpoint" + case _ => throw new UnsupportedOperationException + } + path.toString() } } @@ -102,8 +135,7 @@ object StatefulJobSavepointMigrationITCase { * ITCase for migration Scala state types across different Flink versions. */ @RunWith(classOf[Parameterized]) -class StatefulJobSavepointMigrationITCase( - migrationVersionAndBackend: (FlinkVersion, String)) +class StatefulJobSavepointMigrationITCase(snapshotSpec: SnapshotSpec) extends SnapshotMigrationTestBase with Serializable { @Test @@ -111,7 +143,7 @@ class StatefulJobSavepointMigrationITCase( val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - migrationVersionAndBackend._2 match { + snapshotSpec.getStateBackendType match { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME => env.setStateBackend(new EmbeddedRocksDBStateBackend()) case StateBackendLoader.MEMORY_STATE_BACKEND_NAME => @@ -138,10 +170,11 @@ class StatefulJobSavepointMigrationITCase( .addSink(new AccumulatorCountingSink) if (StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) { - executeAndSavepoint( + executeAndSnapshot( env, s"src/test/resources/" - + StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend), + + StatefulJobSavepointMigrationITCase.getSnapshotPath(snapshotSpec), + snapshotSpec.getSnapshotType(), new Tuple2( AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, StatefulJobSavepointMigrationITCase.NUM_ELEMENTS @@ -152,7 +185,7 @@ class StatefulJobSavepointMigrationITCase( restoreAndExecute( env, SnapshotMigrationTestBase.getResourceFilename( - StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend)), + StatefulJobSavepointMigrationITCase.getSnapshotPath(snapshotSpec)), new Tuple2( AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, StatefulJobSavepointMigrationITCase.NUM_ELEMENTS) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala index 47dc3ae..dc9b540 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala @@ -42,7 +42,7 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase -import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode +import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode, SnapshotSpec, SnapshotType} import org.apache.flink.util.Collector import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -61,39 +61,73 @@ object StatefulJobWBroadcastStateMigrationITCase { // master. val executionMode = ExecutionMode.VERIFY_SNAPSHOT - @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") - def parameters: util.Collection[(FlinkVersion, String)] = { - var parameters = util.Arrays.asList( - // (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - // (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - // (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - // Note: It is not safe to restore savepoints created in a Scala applications with Flink - // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer - // used names of anonymous classes that depend on the relative position/order in code, e.g., - // if two anonymous classes, instantiated inside the same class and from the same base class, - // change order in the code their names are switched. - // As a consequence, changes in code may result in restore failures. - // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493 - (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME), - (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME), - ) + @Parameterized.Parameters(name = "Test snapshot: {0}") + def parameters: util.Collection[SnapshotSpec] = { + // Note: It is not safe to restore savepoints created in a Scala applications with Flink + // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer + // used names of anonymous classes that depend on the relative position/order in code, e.g., + // if two anonymous classes, instantiated inside the same class and from the same base class, + // change order in the code their names are switched. + // As a consequence, changes in code may result in restore failures. + // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493 + var parameters: util.List[SnapshotSpec] = new util.LinkedList[SnapshotSpec]() + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.MEMORY_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_14, currentVersion))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_CANONICAL, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_NATIVE, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.SAVEPOINT_NATIVE, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.HASHMAP_STATE_BACKEND_NAME, + SnapshotType.CHECKPOINT, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))) + parameters.addAll( + SnapshotSpec.withVersions( + StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME, + SnapshotType.CHECKPOINT, + FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion))) if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { - parameters = parameters.stream().filter(x => x._1 == currentVersion) + parameters = parameters.stream() + .filter(x => x.getFlinkVersion().equals(currentVersion)) .collect(Collectors.toList()) } parameters } - def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = { - s"stateful-scala-with-broadcast-udf-migration-itcase" + - s"-flink${migrationVersionAndBackend._1}" + - s"-${migrationVersionAndBackend._2}-savepoint" + def getSnapshotPath(snapshotSpec: SnapshotSpec): String = { + val path = new StringBuilder(s"stateful-scala-with-broadcast-udf-migration-itcase") + path ++= s"-flink${snapshotSpec.getFlinkVersion()}" + path ++= s"-${snapshotSpec.getStateBackendType()}" + snapshotSpec.getSnapshotType() match { + case SnapshotType.SAVEPOINT_CANONICAL => + path ++= "-savepoint" + case SnapshotType.SAVEPOINT_NATIVE => + path ++= "-savepoint-native" + case SnapshotType.CHECKPOINT => + path ++= "-checkpoint" + case _ => throw new UnsupportedOperationException + } + path.toString() } val NUM_ELEMENTS = 4 @@ -103,7 +137,7 @@ object StatefulJobWBroadcastStateMigrationITCase { * ITCase for migration Scala state types across different Flink versions. */ @RunWith(classOf[Parameterized]) -class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (FlinkVersion, String)) +class StatefulJobWBroadcastStateMigrationITCase(snapshotSpec: SnapshotSpec) extends SnapshotMigrationTestBase with Serializable { @Test @@ -112,7 +146,7 @@ class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - migrationVersionAndBackend._2 match { + snapshotSpec.getStateBackendType match { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME => env.setStateBackend(new EmbeddedRocksDBStateBackend()) case StateBackendLoader.MEMORY_STATE_BACKEND_NAME => @@ -163,10 +197,11 @@ class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli .process(new TestBroadcastProcessFunction) .addSink(new AccumulatorCountingSink) - executeAndSavepoint( + executeAndSnapshot( env, s"src/test/resources/" - + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend), + + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(snapshotSpec), + snapshotSpec.getSnapshotType(), new Tuple2( AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS @@ -187,7 +222,7 @@ class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli restoreAndExecute( env, SnapshotMigrationTestBase.getResourceFilename( - StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend)), + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(snapshotSpec)), new Tuple2( AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS)