gaoyunhaii commented on code in PR #21736: URL: https://github.com/apache/flink/pull/21736#discussion_r1181138638
########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java: ########## @@ -60,62 +64,80 @@ * previous Flink versions, as well as for different state backends. */ @RunWith(Parameterized.class) -public class StatefulJobSnapshotMigrationITCase extends SnapshotMigrationTestBase { +public class StatefulJobSnapshotMigrationITCase extends SnapshotMigrationTestBase + implements MigrationTest { private static final int NUM_SOURCE_ELEMENTS = 4; - // TODO increase this to newer version to create and test snapshot migration for newer versions - private static final FlinkVersion currentVersion = FlinkVersion.v1_16; + @Parameterized.Parameters(name = "Test snapshot: {0}") + public static Collection<SnapshotSpec> createSpecsForTestRuns() { + return internalParameters(null); + } + + public static Collection<SnapshotSpec> createSpecsForTestDataGeneration( + FlinkVersion targetVersion) { + return internalParameters(targetVersion); + } - // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots - // TODO Note: You should generate the snapshot based on the release branch instead of the - // master. - private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT; + private static Collection<SnapshotSpec> internalParameters( + @Nullable FlinkVersion targetGeneratingVersion) { + BiFunction<FlinkVersion, FlinkVersion, Collection<FlinkVersion>> getFlinkVersions = + (minInclVersion, maxInclVersion) -> { + if (targetGeneratingVersion != null) { + return Collections.singleton(targetGeneratingVersion); + } else { + return FlinkVersion.rangeOf(minInclVersion, maxInclVersion); + } + }; - @Parameterized.Parameters(name = "Test snapshot: {0}") - public static Collection<SnapshotSpec> parameters() { Collection<SnapshotSpec> parameters = new LinkedList<>(); parameters.addAll( SnapshotSpec.withVersions( StateBackendLoader.MEMORY_STATE_BACKEND_NAME, SnapshotType.SAVEPOINT_CANONICAL, - FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_14))); + getFlinkVersions.apply(FlinkVersion.v1_8, FlinkVersion.v1_14))); Review Comment: That's indeed the case, I changed the implementation of `getFlinkVersions`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org