gaoyunhaii commented on code in PR #21736:
URL: https://github.com/apache/flink/pull/21736#discussion_r1181138638


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java:
##########
@@ -60,62 +64,80 @@
  * previous Flink versions, as well as for different state backends.
  */
 @RunWith(Parameterized.class)
-public class StatefulJobSnapshotMigrationITCase extends 
SnapshotMigrationTestBase {
+public class StatefulJobSnapshotMigrationITCase extends 
SnapshotMigrationTestBase
+        implements MigrationTest {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    // TODO increase this to newer version to create and test snapshot 
migration for newer versions
-    private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> createSpecsForTestRuns() {
+        return internalParameters(null);
+    }
+
+    public static Collection<SnapshotSpec> createSpecsForTestDataGeneration(
+            FlinkVersion targetVersion) {
+        return internalParameters(targetVersion);
+    }
 
-    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
-    // TODO Note: You should generate the snapshot based on the release branch 
instead of the
-    // master.
-    private static final ExecutionMode executionMode = 
ExecutionMode.VERIFY_SNAPSHOT;
+    private static Collection<SnapshotSpec> internalParameters(
+            @Nullable FlinkVersion targetGeneratingVersion) {
+        BiFunction<FlinkVersion, FlinkVersion, Collection<FlinkVersion>> 
getFlinkVersions =
+                (minInclVersion, maxInclVersion) -> {
+                    if (targetGeneratingVersion != null) {
+                        return Collections.singleton(targetGeneratingVersion);
+                    } else {
+                        return FlinkVersion.rangeOf(minInclVersion, 
maxInclVersion);
+                    }
+                };
 
-    @Parameterized.Parameters(name = "Test snapshot: {0}")
-    public static Collection<SnapshotSpec> parameters() {
         Collection<SnapshotSpec> parameters = new LinkedList<>();
         parameters.addAll(
                 SnapshotSpec.withVersions(
                         StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
                         SnapshotType.SAVEPOINT_CANONICAL,
-                        FlinkVersion.rangeOf(FlinkVersion.v1_8, 
FlinkVersion.v1_14)));
+                        getFlinkVersions.apply(FlinkVersion.v1_8, 
FlinkVersion.v1_14)));

Review Comment:
   That's indeed the case, I changed the implementation of `getFlinkVersions`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to