This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c88189f0245630b7b16e7e5dd63b8b7fbaecd92a
Author: Sebastian Mattheis <sebast...@ververica.com>
AuthorDate: Fri Feb 18 10:18:03 2022 +0100

    [FLINK-26146] Adapt scala tests to cover native snapshots migration
    
    Adapt scala savepoint migration tests to cover native savepoints and 
checkpoints.
    
    This closes #18850
---
 .../utils/SnapshotMigrationTestBase.java           |  11 ---
 .../StatefulJobSavepointMigrationITCase.scala      | 103 ++++++++++++++-------
 ...StatefulJobWBroadcastStateMigrationITCase.scala | 101 +++++++++++++-------
 3 files changed, 136 insertions(+), 79 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
index 57a4894..2cb6b6a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
@@ -257,17 +257,6 @@ public abstract class SnapshotMigrationTestBase extends 
TestBaseUtils {
         return config;
     }
 
-    @Deprecated
-    @SafeVarargs
-    protected final void executeAndSavepoint(
-            StreamExecutionEnvironment env,
-            String snapshotPath,
-            Tuple2<String, Integer>... expectedAccumulators)
-            throws Exception {
-        executeAndSnapshot(
-                env, snapshotPath, SnapshotType.SAVEPOINT_CANONICAL, 
expectedAccumulators);
-    }
-
     @SafeVarargs
     protected final void executeAndSnapshot(
             StreamExecutionEnvironment env,
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index 3ff91ee..cf88f39 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -41,7 +41,7 @@ import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext, StateBackendLoader}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
-import 
org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
+import 
org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode,
 SnapshotSpec, SnapshotType}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.Test
@@ -59,31 +59,54 @@ object StatefulJobSavepointMigrationITCase {
   // master.
   val executionMode = ExecutionMode.VERIFY_SNAPSHOT
 
-  @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
-  def parameters: util.Collection[(FlinkVersion, String)] = {
-    var parameters = util.Arrays.asList(
-      //      (FlinkVersion.v1_3, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      //      (FlinkVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      //      (FlinkVersion.v1_6, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      //      (FlinkVersion.v1_7, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      // Note: It is not safe to restore savepoints created in a Scala 
applications with Flink
-      // version 1.7 or below. The reason is that up to version 1.7 the 
underlying Scala serializer
-      // used names of anonymous classes that depend on the relative 
position/order in code, e.g.,
-      // if two anonymous classes, instantiated inside the same class and from 
the same base class,
-      // change order in the code their names are switched.
-      // As a consequence, changes in code may result in restore failures.
-      // This was fixed in version 1.8, see: 
https://issues.apache.org/jira/browse/FLINK-10493
-      (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-    )
+  @Parameterized.Parameters(name = "Test snapshot: {0}")
+  def parameters: util.Collection[SnapshotSpec] = {
+    // Note: It is not safe to restore savepoints created in a Scala 
applications with Flink
+    // version 1.7 or below. The reason is that up to version 1.7 the 
underlying Scala serializer
+    // used names of anonymous classes that depend on the relative 
position/order in code, e.g.,
+    // if two anonymous classes, instantiated inside the same class and from 
the same base class,
+    // change order in the code their names are switched.
+    // As a consequence, changes in code may result in restore failures.
+    // This was fixed in version 1.8, see: 
https://issues.apache.org/jira/browse/FLINK-10493
+    var parameters: util.List[SnapshotSpec] = new 
util.LinkedList[SnapshotSpec]()
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_14, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_NATIVE,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_NATIVE,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.CHECKPOINT,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.CHECKPOINT,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
     if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
-      parameters = parameters.stream().filter(x => x._1 == currentVersion)
+      parameters = parameters.stream()
+        .filter(x => x.getFlinkVersion().equals(currentVersion))
         .collect(Collectors.toList())
     }
     parameters
@@ -91,10 +114,20 @@ object StatefulJobSavepointMigrationITCase {
 
   val NUM_ELEMENTS = 4
 
-  def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): 
String = {
-    s"stateful-scala-udf-migration-itcase" +
-      s"-flink${migrationVersionAndBackend._1}" +
-      s"-${migrationVersionAndBackend._2}-savepoint"
+  def getSnapshotPath(snapshotSpec: SnapshotSpec): String = {
+    val path = new StringBuilder(s"stateful-scala-udf-migration-itcase")
+    path ++= s"-flink${snapshotSpec.getFlinkVersion()}"
+    path ++= s"-${snapshotSpec.getStateBackendType()}"
+    snapshotSpec.getSnapshotType() match {
+      case SnapshotType.SAVEPOINT_CANONICAL =>
+        path ++= "-savepoint"
+      case SnapshotType.SAVEPOINT_NATIVE =>
+        path ++= "-savepoint-native"
+      case SnapshotType.CHECKPOINT =>
+        path ++= "-checkpoint"
+      case _ => throw new UnsupportedOperationException
+    }
+    path.toString()
   }
 }
 
@@ -102,8 +135,7 @@ object StatefulJobSavepointMigrationITCase {
  * ITCase for migration Scala state types across different Flink versions.
  */
 @RunWith(classOf[Parameterized])
-class StatefulJobSavepointMigrationITCase(
-                                           migrationVersionAndBackend: 
(FlinkVersion, String))
+class StatefulJobSavepointMigrationITCase(snapshotSpec: SnapshotSpec)
   extends SnapshotMigrationTestBase with Serializable {
 
   @Test
@@ -111,7 +143,7 @@ class StatefulJobSavepointMigrationITCase(
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-    migrationVersionAndBackend._2 match {
+    snapshotSpec.getStateBackendType match {
       case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
         env.setStateBackend(new EmbeddedRocksDBStateBackend())
       case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
@@ -138,10 +170,11 @@ class StatefulJobSavepointMigrationITCase(
       .addSink(new AccumulatorCountingSink)
 
     if (StatefulJobSavepointMigrationITCase.executionMode == 
ExecutionMode.CREATE_SNAPSHOT) {
-      executeAndSavepoint(
+      executeAndSnapshot(
         env,
         s"src/test/resources/"
-          + 
StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+          + StatefulJobSavepointMigrationITCase.getSnapshotPath(snapshotSpec),
+        snapshotSpec.getSnapshotType(),
         new Tuple2(
           AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
           StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
@@ -152,7 +185,7 @@ class StatefulJobSavepointMigrationITCase(
       restoreAndExecute(
         env,
         SnapshotMigrationTestBase.getResourceFilename(
-          
StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+          StatefulJobSavepointMigrationITCase.getSnapshotPath(snapshotSpec)),
         new Tuple2(
           AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
           StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index 47dc3ae..dc9b540 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -42,7 +42,7 @@ import 
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
-import 
org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
+import 
org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode,
 SnapshotSpec, SnapshotType}
 import org.apache.flink.util.Collector
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -61,39 +61,73 @@ object StatefulJobWBroadcastStateMigrationITCase {
   // master.
   val executionMode = ExecutionMode.VERIFY_SNAPSHOT
 
-  @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
-  def parameters: util.Collection[(FlinkVersion, String)] = {
-    var parameters = util.Arrays.asList(
-      //      (FlinkVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      //      (FlinkVersion.v1_6, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      //      (FlinkVersion.v1_7, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      // Note: It is not safe to restore savepoints created in a Scala 
applications with Flink
-      // version 1.7 or below. The reason is that up to version 1.7 the 
underlying Scala serializer
-      // used names of anonymous classes that depend on the relative 
position/order in code, e.g.,
-      // if two anonymous classes, instantiated inside the same class and from 
the same base class,
-      // change order in the code their names are switched.
-      // As a consequence, changes in code may result in restore failures.
-      // This was fixed in version 1.8, see: 
https://issues.apache.org/jira/browse/FLINK-10493
-      (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-    )
+  @Parameterized.Parameters(name = "Test snapshot: {0}")
+  def parameters: util.Collection[SnapshotSpec] = {
+    // Note: It is not safe to restore savepoints created in a Scala 
applications with Flink
+    // version 1.7 or below. The reason is that up to version 1.7 the 
underlying Scala serializer
+    // used names of anonymous classes that depend on the relative 
position/order in code, e.g.,
+    // if two anonymous classes, instantiated inside the same class and from 
the same base class,
+    // change order in the code their names are switched.
+    // As a consequence, changes in code may result in restore failures.
+    // This was fixed in version 1.8, see: 
https://issues.apache.org/jira/browse/FLINK-10493
+    var parameters: util.List[SnapshotSpec] = new 
util.LinkedList[SnapshotSpec]()
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_14, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_NATIVE,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_NATIVE,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.CHECKPOINT,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.CHECKPOINT,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
     if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
-      parameters = parameters.stream().filter(x => x._1 == currentVersion)
+      parameters = parameters.stream()
+        .filter(x => x.getFlinkVersion().equals(currentVersion))
         .collect(Collectors.toList())
     }
     parameters
   }
 
-  def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): 
String = {
-    s"stateful-scala-with-broadcast-udf-migration-itcase" +
-      s"-flink${migrationVersionAndBackend._1}" +
-      s"-${migrationVersionAndBackend._2}-savepoint"
+  def getSnapshotPath(snapshotSpec: SnapshotSpec): String = {
+    val path = new 
StringBuilder(s"stateful-scala-with-broadcast-udf-migration-itcase")
+    path ++= s"-flink${snapshotSpec.getFlinkVersion()}"
+    path ++= s"-${snapshotSpec.getStateBackendType()}"
+    snapshotSpec.getSnapshotType() match {
+      case SnapshotType.SAVEPOINT_CANONICAL =>
+        path ++= "-savepoint"
+      case SnapshotType.SAVEPOINT_NATIVE =>
+        path ++= "-savepoint-native"
+      case SnapshotType.CHECKPOINT =>
+        path ++= "-checkpoint"
+      case _ => throw new UnsupportedOperationException
+    }
+    path.toString()
   }
 
   val NUM_ELEMENTS = 4
@@ -103,7 +137,7 @@ object StatefulJobWBroadcastStateMigrationITCase {
  * ITCase for migration Scala state types across different Flink versions.
  */
 @RunWith(classOf[Parameterized])
-class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: 
(FlinkVersion, String))
+class StatefulJobWBroadcastStateMigrationITCase(snapshotSpec: SnapshotSpec)
   extends SnapshotMigrationTestBase with Serializable {
 
   @Test
@@ -112,7 +146,7 @@ class 
StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-    migrationVersionAndBackend._2 match {
+    snapshotSpec.getStateBackendType match {
       case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
         env.setStateBackend(new EmbeddedRocksDBStateBackend())
       case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
@@ -163,10 +197,11 @@ class 
StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli
         .process(new TestBroadcastProcessFunction)
         .addSink(new AccumulatorCountingSink)
 
-      executeAndSavepoint(
+      executeAndSnapshot(
         env,
         s"src/test/resources/"
-          + 
StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+          + 
StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(snapshotSpec),
+        snapshotSpec.getSnapshotType(),
         new Tuple2(
           AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
           StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS
@@ -187,7 +222,7 @@ class 
StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli
       restoreAndExecute(
         env,
         SnapshotMigrationTestBase.getResourceFilename(
-          
StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+          
StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(snapshotSpec)),
         new Tuple2(
           AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
           StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS)

Reply via email to