This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2837c20a10b [FLINK-39124][JUnit5 Migration] Migrate the JUnit5 for 
Module: flink-tests (#27666)
2837c20a10b is described below

commit 2837c20a10bd226188d3491b4a7bc4fddf2ea9cb
Author: Mate Czagany <[email protected]>
AuthorDate: Wed Mar 18 13:24:13 2026 +0100

    [FLINK-39124][JUnit5 Migration] Migrate the JUnit5 for Module: flink-tests 
(#27666)
    
    * [FLINK-39124][test] Migrate operator restore tests to JUnit 5
    
    * [FLINK-39124] Remove visibility modifiers from all test methods and 
classes
    
    * [FLINK-39124] Improve scopes and assertions
---
 .../restore/AbstractOperatorRestoreTestBase.java   | 87 +++++++++----------
 .../restore/StreamOperatorSnapshotRestoreTest.java | 98 ++++++++++------------
 .../AbstractKeyedOperatorRestoreTestBase.java      | 12 +--
 .../restore/keyed/KeyedComplexChainTest.java       |  6 +-
 .../state/operator/restore/keyed/KeyedJob.java     | 33 ++++----
 .../AbstractNonKeyedOperatorRestoreTestBase.java   | 13 +--
 .../operator/restore/unkeyed/ChainBreakTest.java   |  8 +-
 .../restore/unkeyed/ChainLengthDecreaseTest.java   |  9 +-
 .../restore/unkeyed/ChainLengthIncreaseTest.java   |  9 +-
 .../unkeyed/ChainLengthStatelessDecreaseTest.java  |  9 +-
 .../operator/restore/unkeyed/ChainOrderTest.java   |  9 +-
 .../operator/restore/unkeyed/ChainUnionTest.java   |  9 +-
 .../operator/restore/unkeyed/NonKeyedJob.java      | 27 +++---
 13 files changed, 135 insertions(+), 194 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 99403d6bb08..612953f4379 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -33,21 +33,25 @@ import 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.streaming.util.CheckpointStorageUtils;
 import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.streaming.util.StateBackendUtils;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.test.util.MigrationTest;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.net.URI;
@@ -65,8 +69,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Abstract class to verify that it is possible to migrate a savepoint across 
upgraded Flink
@@ -79,19 +82,23 @@ import static org.junit.Assert.assertNotNull;
  * <p>The savepoint _metadata file for the current branch is stored in the 
savepointPath in {@link
  * AbstractOperatorRestoreTestBase#migrateJob}.
  */
-public abstract class AbstractOperatorRestoreTestBase extends TestLogger 
implements MigrationTest {
+@ExtendWith({TestLoggerExtension.class, ParameterizedTestExtension.class})
+public abstract class AbstractOperatorRestoreTestBase implements MigrationTest 
{
 
-    private final FlinkVersion flinkVersion;
+    @Parameter private FlinkVersion flinkVersion;
 
-    @Parameterized.Parameters(name = "Migrate Savepoint: {0}")
-    public static Collection<FlinkVersion> parameters() {
+    @Parameters(name = "Migrate Savepoint: {0}")
+    private static Collection<FlinkVersion> parameters() {
         return FlinkVersion.rangeOf(
                 FlinkVersion.v1_8, 
MigrationTest.getMostRecentlyPublishedVersion());
     }
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private final ScheduledExecutor scheduledExecutor =
+            new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
 
     private static final int NUM_TMS = 1;
     private static final int NUM_SLOTS_PER_TM = 4;
@@ -110,25 +117,24 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger impleme
                             
.map(AbstractOperatorRestoreTestBase::escapeRegexCharacters)
                             .collect(Collectors.joining(")|(", "(", ")")));
 
-    @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder();
+    @TempDir private File tmpFolder;
 
-    @Rule
-    public final MiniClusterWithClientResource cluster =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(NUM_TMS)
                             .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
                             .build());
 
-    private final ScheduledExecutor scheduledExecutor =
-            new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
+    private ClusterClient<?> clusterClient;
 
-    protected AbstractOperatorRestoreTestBase(FlinkVersion flinkVersion) {
-        this.flinkVersion = flinkVersion;
+    @BeforeEach
+    void setClusterClient(@InjectClusterClient ClusterClient<?> clusterClient) 
{
+        this.clusterClient = clusterClient;
     }
 
     protected void internalGenerateSnapshots(FlinkVersion targetVersion) 
throws Exception {
-        ClusterClient<?> clusterClient = cluster.getClusterClient();
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
 
         // Submit job with old version savepoint and create a migrated 
savepoint in the new version.
@@ -149,9 +155,8 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger impleme
         }
     }
 
-    @Test
-    public void testMigrationAndRestore() throws Throwable {
-        ClusterClient<?> clusterClient = cluster.getClusterClient();
+    @TestTemplate
+    void testMigrationAndRestore() throws Throwable {
         final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
 
         // submit job with old version savepoint and create a migrated 
savepoint in the new version
@@ -176,7 +181,7 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger impleme
         jobToMigrate.setSavepointRestoreSettings(
                 SavepointRestoreSettings.forPath(savepointResource.getFile()));
 
-        assertNotNull(jobToMigrate.getJobID());
+        assertThat(jobToMigrate.getJobID()).isNotNull();
 
         clusterClient.submitJob(jobToMigrate).get();
 
@@ -187,12 +192,10 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger impleme
                         deadline,
                         (jobStatus) -> jobStatus == JobStatus.RUNNING,
                         scheduledExecutor);
-        assertEquals(
-                JobStatus.RUNNING,
-                jobRunningFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
+        assertThat(jobRunningFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
+                .isEqualTo(JobStatus.RUNNING);
 
         // Trigger savepoint
-        File targetDirectory = tmpFolder.newFolder();
         String savepointPath = null;
 
         // FLINK-6918: Retry cancel with savepoint message in case that 
StreamTasks were not running
@@ -204,7 +207,7 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger impleme
                         clusterClient
                                 .cancelWithSavepoint(
                                         jobToMigrate.getJobID(),
-                                        targetDirectory.getAbsolutePath(),
+                                        tmpFolder.getAbsolutePath(),
                                         SavepointFormatType.CANONICAL)
                                 .get();
             } catch (Exception e) {
@@ -217,7 +220,7 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger impleme
             }
         }
 
-        assertNotNull("Could not take savepoint.", savepointPath);
+        assertThat(savepointPath).as("Could not take savepoint.").isNotNull();
 
         CompletableFuture<JobStatus> jobCanceledFuture =
                 FutureUtils.retrySuccessfulWithDelay(
@@ -226,9 +229,8 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger impleme
                         deadline,
                         (jobStatus) -> jobStatus == JobStatus.CANCELED,
                         scheduledExecutor);
-        assertEquals(
-                JobStatus.CANCELED,
-                jobCanceledFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
+        assertThat(jobCanceledFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
+                .isEqualTo(JobStatus.CANCELED);
 
         return savepointPath;
     }
@@ -239,7 +241,7 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger impleme
         jobToRestore.setSavepointRestoreSettings(
                 SavepointRestoreSettings.forPath(savepointPath, true));
 
-        assertNotNull("Job doesn't have a JobID.", jobToRestore.getJobID());
+        assertThat(jobToRestore.getJobID()).as("Job doesn't have a 
JobID.").isNotNull();
 
         clusterClient.submitJob(jobToRestore).get();
 
@@ -250,9 +252,8 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger impleme
                         deadline,
                         (jobStatus) -> jobStatus == JobStatus.FINISHED,
                         scheduledExecutor);
-        assertEquals(
-                JobStatus.FINISHED,
-                jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
+        assertThat(jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
+                .isEqualTo(JobStatus.FINISHED);
     }
 
     private JobGraph createJobGraph(ExecutionMode mode) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
index ae8c12b2819..5c606000fbb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
@@ -64,25 +64,23 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TernaryBoolean;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Arrays;
 import java.util.BitSet;
-import java.util.Collection;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link StreamOperator} snapshot restoration. */
-@RunWith(Parameterized.class)
-public class StreamOperatorSnapshotRestoreTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class StreamOperatorSnapshotRestoreTest {
 
     private static final int ONLY_JM_RECOVERY = 0;
     private static final int TM_AND_JM_RECOVERY = 1;
@@ -91,42 +89,27 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
 
     private static final int MAX_PARALLELISM = 10;
 
-    protected static TemporaryFolder temporaryFolder;
-
-    @Parameterized.Parameter public StateBackendEnum stateBackendEnum;
+    @TempDir private static File temporaryFolder;
 
-    enum StateBackendEnum {
+    private enum StateBackendEnum {
         FILE,
         ROCKSDB_FULLY_ASYNC,
         ROCKSDB_INCREMENTAL
     }
 
-    @Parameterized.Parameters(name = "statebackend type ={0}")
-    public static Collection<StateBackendEnum> parameter() {
-        return Arrays.asList(StateBackendEnum.values());
-    }
-
-    @BeforeClass
-    public static void beforeClass() throws IOException {
-        temporaryFolder = new TemporaryFolder();
-        temporaryFolder.create();
-    }
-
-    @AfterClass
-    public static void afterClass() {
-        temporaryFolder.delete();
-    }
-
     /** Test restoring an operator from a snapshot (local recovery 
deactivated). */
-    @Test
-    public void testOperatorStatesSnapshotRestore() throws Exception {
-        testOperatorStatesSnapshotRestoreInternal(ONLY_JM_RECOVERY);
+    @ParameterizedTest
+    @EnumSource(value = StateBackendEnum.class)
+    void testOperatorStatesSnapshotRestore(StateBackendEnum stateBackendEnum) 
throws Exception {
+        testOperatorStatesSnapshotRestoreInternal(ONLY_JM_RECOVERY, 
stateBackendEnum);
     }
 
     /** Test restoring an operator from a snapshot (local recovery activated). 
*/
-    @Test
-    public void testOperatorStatesSnapshotRestoreWithLocalState() throws 
Exception {
-        testOperatorStatesSnapshotRestoreInternal(TM_AND_JM_RECOVERY);
+    @ParameterizedTest
+    @EnumSource(value = StateBackendEnum.class)
+    void testOperatorStatesSnapshotRestoreWithLocalState(StateBackendEnum 
stateBackendEnum)
+            throws Exception {
+        testOperatorStatesSnapshotRestoreInternal(TM_AND_JM_RECOVERY, 
stateBackendEnum);
     }
 
     /**
@@ -135,9 +118,11 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
      * <p>This case does not really simulate a practical scenario, but we make 
sure that restore
      * happens from the local state here because we discard the JM state.
      */
-    @Test
-    public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedJM() 
throws Exception {
-        testOperatorStatesSnapshotRestoreInternal(TM_REMOVE_JM_RECOVERY);
+    @ParameterizedTest
+    @EnumSource(value = StateBackendEnum.class)
+    void 
testOperatorStatesSnapshotRestoreWithLocalStateDeletedJM(StateBackendEnum 
stateBackendEnum)
+            throws Exception {
+        testOperatorStatesSnapshotRestoreInternal(TM_REMOVE_JM_RECOVERY, 
stateBackendEnum);
     }
 
     /**
@@ -147,12 +132,15 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
      * <p>This tests discards the local state, to simulate corruption and 
checks that we still
      * recover from the fallback JM state.
      */
-    @Test
-    public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedTM() 
throws Exception {
-        testOperatorStatesSnapshotRestoreInternal(JM_REMOVE_TM_RECOVERY);
+    @ParameterizedTest
+    @EnumSource(value = StateBackendEnum.class)
+    void 
testOperatorStatesSnapshotRestoreWithLocalStateDeletedTM(StateBackendEnum 
stateBackendEnum)
+            throws Exception {
+        testOperatorStatesSnapshotRestoreInternal(JM_REMOVE_TM_RECOVERY, 
stateBackendEnum);
     }
 
-    private void testOperatorStatesSnapshotRestoreInternal(final int mode) 
throws Exception {
+    private void testOperatorStatesSnapshotRestoreInternal(
+            final int mode, StateBackendEnum stateBackendEnum) throws 
Exception {
 
         // 
-------------------------------------------------------------------------- 
snapshot
 
@@ -183,7 +171,7 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
                 mode == ONLY_JM_RECOVERY
                         ? null
                         : new LocalSnapshotDirectoryProviderImpl(
-                                temporaryFolder.newFolder(), jobID, 
jobVertexID, subtaskIdx);
+                                temporaryFolder, jobID, jobVertexID, 
subtaskIdx);
 
         LocalRecoveryConfig localRecoveryConfig =
                 (directoryProvider == null)
@@ -259,8 +247,8 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
         OperatorSubtaskState taskLocalState = 
snapshotWithLocalState.getTaskLocalState();
 
         // We check if local state was created when we enabled local recovery
-        Assert.assertTrue(
-                mode > ONLY_JM_RECOVERY == (taskLocalState != null && 
taskLocalState.hasState()));
+        assertThat(taskLocalState != null && taskLocalState.hasState())
+                .isEqualTo(mode > ONLY_JM_RECOVERY);
 
         if (mode == TM_REMOVE_JM_RECOVERY) {
             jobManagerOwnedState.getManagedKeyedState().discardState();
@@ -302,7 +290,7 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
                 // check restored managed keyed state
                 long exp = element.getValue() + 1;
                 long act = keyedState.value();
-                Assert.assertEquals(exp, act);
+                assertThat(act).isEqualTo(exp);
             } else {
                 // write managed keyed state that goes into snapshot
                 keyedState.update(element.getValue() + 1);
@@ -328,7 +316,7 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
                 ++count;
             }
 
-            Assert.assertEquals(MAX_PARALLELISM, count);
+            assertThat(count).isEqualTo(MAX_PARALLELISM);
 
             // write raw operator state that goes into snapshot
             OperatorStateCheckpointOutputStream outOp = 
context.getRawOperatorStateOutput();
@@ -342,7 +330,7 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
         @Override
         public void initializeState(StateInitializationContext context) throws 
Exception {
 
-            Assert.assertEquals(verifyRestore, context.isRestored());
+            assertThat(context.isRestored()).isEqualTo(verifyRestore);
 
             keyedState =
                     context.getKeyedStateStore()
@@ -362,11 +350,11 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
                         context.getRawKeyedStateInputs()) {
                     try (InputStream in = streamProvider.getStream()) {
                         DataInputView div = new DataInputViewStreamWrapper(in);
-                        Assert.assertEquals(streamProvider.getKeyGroupId() + 
2, div.readInt());
+                        
assertThat(div.readInt()).isEqualTo(streamProvider.getKeyGroupId() + 2);
                         ++count;
                     }
                 }
-                Assert.assertEquals(MAX_PARALLELISM, count);
+                assertThat(count).isEqualTo(MAX_PARALLELISM);
 
                 // check restored managed operator state
                 BitSet check = new BitSet(10);
@@ -374,7 +362,7 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
                     check.set(v);
                 }
 
-                Assert.assertEquals(10, check.cardinality());
+                assertThat(check.cardinality()).isEqualTo(10);
 
                 // check restored raw operator state
                 check = new BitSet(13);
@@ -385,7 +373,7 @@ public class StreamOperatorSnapshotRestoreTest extends 
TestLogger {
                         check.set(div.readInt() - 42);
                     }
                 }
-                Assert.assertEquals(13, check.cardinality());
+                assertThat(check.cardinality()).isEqualTo(13);
             }
         }
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
index fdc85b298b5..be58b958964 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
@@ -25,19 +25,11 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 /** Base class for all keyed operator restore tests. */
-@RunWith(Parameterized.class)
-public abstract class AbstractKeyedOperatorRestoreTestBase extends 
AbstractOperatorRestoreTestBase {
-
-    public AbstractKeyedOperatorRestoreTestBase(FlinkVersion flinkVersion) {
-        super(flinkVersion);
-    }
+abstract class AbstractKeyedOperatorRestoreTestBase extends 
AbstractOperatorRestoreTestBase {
 
     @Override
-    public void createMigrationJob(StreamExecutionEnvironment env) {
+    protected void createMigrationJob(StreamExecutionEnvironment env) {
         /** Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2) */
         SingleOutputStreamOperator<Tuple2<Integer, Integer>> source =
                 KeyedJob.createIntegerTupleSource(env, ExecutionMode.MIGRATE);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
index 98150aca70c..79be50b4f14 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
@@ -25,11 +25,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
 
 /** Test state restoration for a keyed operator restore tests. */
-public class KeyedComplexChainTest extends 
AbstractKeyedOperatorRestoreTestBase {
-
-    public KeyedComplexChainTest(FlinkVersion flinkVersion) {
-        super(flinkVersion);
-    }
+class KeyedComplexChainTest extends AbstractKeyedOperatorRestoreTestBase {
 
     @SnapshotsGenerator
     public void generateSnapshots(FlinkVersion targetVersion) throws Exception 
{
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
index ea39fbc458b..58a47f5bafa 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -43,12 +43,12 @@ import 
org.apache.flink.test.state.operator.restore.ExecutionMode;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ParameterTool;
 
-import org.junit.Assert;
-
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /**
  * Savepoint generator to create the savepoint used by the {@link
  * AbstractKeyedOperatorRestoreTestBase}. Switch to specific version branches 
and run this job to
@@ -206,15 +206,15 @@ public class KeyedJob {
                     while (input.hasNext() && restored.hasNext()) {
                         Tuple2<Integer, Integer> value = input.next();
                         Integer rValue = restored.next();
-                        Assert.assertEquals(rValue, value.f1);
+                        assertThat(value.f1).isEqualTo(rValue);
                     }
-                    Assert.assertEquals(restored.hasNext(), input.hasNext());
+                    assertThat(restored.hasNext()).isEqualTo(input.hasNext());
             }
         }
 
         @Override
         public void close() {
-            Assert.assertTrue("Apply was never called.", applyCalled);
+            assertThat(applyCalled).as("Apply was never called.").isTrue();
         }
     }
 
@@ -248,17 +248,18 @@ public class KeyedJob {
                     break;
                 case MIGRATE:
                 case RESTORE:
-                    Assert.assertEquals(
-                            "Failed for "
-                                    + valueToStore
-                                    + 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
-                            1,
-                            state.size());
-                    String value = state.get(0);
-                    Assert.assertEquals(
-                            valueToStore
-                                    + 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
-                            value);
+                    assertThat(state)
+                            .as(
+                                    "Failed for "
+                                            + valueToStore
+                                            + getRuntimeContext()
+                                                    .getTaskInfo()
+                                                    .getIndexOfThisSubtask())
+                            .containsExactly(
+                                    valueToStore
+                                            + getRuntimeContext()
+                                                    .getTaskInfo()
+                                                    .getIndexOfThisSubtask());
             }
         }
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
index eb5af92c2d4..998f6ead3c7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -25,9 +25,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
 import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
 import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
@@ -35,16 +32,10 @@ import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
 import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
 
 /** Base class for all non-keyed operator restore tests. */
-@RunWith(Parameterized.class)
-public abstract class AbstractNonKeyedOperatorRestoreTestBase
-        extends AbstractOperatorRestoreTestBase {
-
-    protected AbstractNonKeyedOperatorRestoreTestBase(FlinkVersion 
flinkVersion) {
-        super(flinkVersion);
-    }
+abstract class AbstractNonKeyedOperatorRestoreTestBase extends 
AbstractOperatorRestoreTestBase {
 
     @Override
-    public void createMigrationJob(StreamExecutionEnvironment env) {
+    protected void createMigrationJob(StreamExecutionEnvironment env) {
         /** Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> 
StatefulMap3) */
         DataStream<Integer> source = createSource(env, ExecutionMode.MIGRATE);
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
index f49805ca344..d9a6f85e2a5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
@@ -31,11 +31,7 @@ import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
 import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
 
 /** Verifies that the state of all operators is restored if a topology change 
breaks up a chain. */
-public class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {
-
-    public ChainBreakTest(FlinkVersion flinkVersion) {
-        super(flinkVersion);
-    }
+class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
     @SnapshotsGenerator
     public void generateSnapshots(FlinkVersion targetVersion) throws Exception 
{
@@ -43,7 +39,7 @@ public class ChainBreakTest extends 
AbstractNonKeyedOperatorRestoreTestBase {
     }
 
     @Override
-    public void createRestoredJob(StreamExecutionEnvironment env) {
+    protected void createRestoredJob(StreamExecutionEnvironment env) {
         /**
          * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map 
-> StatefulMap3)
          * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map) 
-> StatefulMap3
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
index 516450c54ed..d83091e3955 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.state.operator.restore.unkeyed;
 
-import org.apache.flink.FlinkVersion;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -35,14 +34,10 @@ import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  *
  * <p>The test shares the snapshots generated by {@link ChainBreakTest}.
  */
-public class ChainLengthDecreaseTest extends 
AbstractNonKeyedOperatorRestoreTestBase {
-
-    public ChainLengthDecreaseTest(FlinkVersion flinkVersion) {
-        super(flinkVersion);
-    }
+class ChainLengthDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
     @Override
-    public void createRestoredJob(StreamExecutionEnvironment env) {
+    protected void createRestoredJob(StreamExecutionEnvironment env) {
         /**
          * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map 
-> StatefulMap3)
          * Modified job: Source -> StatefulMap1 -> CHAIN(Map -> StatefulMap3)
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
index 6239564975e..2cd02967488 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.state.operator.restore.unkeyed;
 
-import org.apache.flink.FlinkVersion;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -36,14 +35,10 @@ import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  *
  * <p>The test shares the snapshots generated by {@link ChainBreakTest}.
  */
-public class ChainLengthIncreaseTest extends 
AbstractNonKeyedOperatorRestoreTestBase {
-
-    public ChainLengthIncreaseTest(FlinkVersion flinkVersion) {
-        super(flinkVersion);
-    }
+class ChainLengthIncreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
     @Override
-    public void createRestoredJob(StreamExecutionEnvironment env) {
+    protected void createRestoredJob(StreamExecutionEnvironment env) {
         /**
          * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map 
-> StatefulMap3)
          * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map 
-> StatefulMap3 ->
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
index 771e3caa53b..93f4b4b4fbd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.state.operator.restore.unkeyed;
 
-import org.apache.flink.FlinkVersion;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -38,14 +37,10 @@ import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  *
  * <p>The test shares the snapshots generated by {@link ChainBreakTest}.
  */
-public class ChainLengthStatelessDecreaseTest extends 
AbstractNonKeyedOperatorRestoreTestBase {
-
-    public ChainLengthStatelessDecreaseTest(FlinkVersion flinkVersion) {
-        super(flinkVersion);
-    }
+class ChainLengthStatelessDecreaseTest extends 
AbstractNonKeyedOperatorRestoreTestBase {
 
     @Override
-    public void createRestoredJob(StreamExecutionEnvironment env) {
+    protected void createRestoredJob(StreamExecutionEnvironment env) {
         /*
          * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map 
-> StatefulMap3)
          * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> 
StatefulMap3)
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
index f15b5217f4a..7c12d51eb23 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.state.operator.restore.unkeyed;
 
-import org.apache.flink.FlinkVersion;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -36,14 +35,10 @@ import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  *
  * <p>The test shares the snapshots generated by {@link ChainBreakTest}.
  */
-public class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {
-
-    public ChainOrderTest(FlinkVersion flinkVersion) {
-        super(flinkVersion);
-    }
+class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
     @Override
-    public void createRestoredJob(StreamExecutionEnvironment env) {
+    protected void createRestoredJob(StreamExecutionEnvironment env) {
         /**
          * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map 
-> StatefulMap3)
          * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap3 -> Map 
-> StatefulMap2)
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
index 769a07fc08c..c7072b40b67 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.state.operator.restore.unkeyed;
 
-import org.apache.flink.FlinkVersion;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -35,14 +34,10 @@ import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  *
  * <p>The test shares the snapshots generated by {@link ChainBreakTest}.
  */
-public class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {
-
-    public ChainUnionTest(FlinkVersion flinkVersion) {
-        super(flinkVersion);
-    }
+class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
     @Override
-    public void createRestoredJob(StreamExecutionEnvironment env) {
+    protected void createRestoredJob(StreamExecutionEnvironment env) {
         /**
          * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map 
-> StatefulMap3)
          * Modified job: Source -> CHAIN(StatefulMap1 -> StatefulMap2 -> Map 
-> StatefulMap3)
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
index f7cf09604b6..c815349b8d8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
@@ -34,11 +34,11 @@ import org.apache.flink.streaming.util.StateBackendUtils;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
 import org.apache.flink.util.ParameterTool;
 
-import org.junit.Assert;
-
 import java.util.Arrays;
 import java.util.List;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /**
  * Savepoint generator to create the savepoint used by the {@link
  * AbstractNonKeyedOperatorRestoreTestBase}. Switch to specific version 
branches and run this job to
@@ -148,17 +148,18 @@ public class NonKeyedJob {
                     break;
                 case MIGRATE:
                 case RESTORE:
-                    Assert.assertEquals(
-                            "Failed for "
-                                    + valueToStore
-                                    + 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
-                            1,
-                            state.size());
-                    String value = state.get(0);
-                    Assert.assertEquals(
-                            valueToStore
-                                    + 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
-                            value);
+                    assertThat(state)
+                            .as(
+                                    "Failed for "
+                                            + valueToStore
+                                            + getRuntimeContext()
+                                                    .getTaskInfo()
+                                                    .getIndexOfThisSubtask())
+                            .containsExactly(
+                                    valueToStore
+                                            + getRuntimeContext()
+                                                    .getTaskInfo()
+                                                    .getIndexOfThisSubtask());
             }
         }
     }


Reply via email to