This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d7e4f319ae [FLINK-39124][JUnit5 Migration] Migrate the JUnit5 for 
Module: flink-tests (#27665)
3d7e4f319ae is described below

commit 3d7e4f319ae935bdd9f5aa6d49f09a51662b561e
Author: Mate Czagany <[email protected]>
AuthorDate: Wed Mar 18 11:01:25 2026 +0100

    [FLINK-39124][JUnit5 Migration] Migrate the JUnit5 for Module: flink-tests 
(#27665)
    
    [FLINK-39124][test] Migrate CancellingTestBase and 
StreamFaultToleranceTestBase to JUnit 5
---
 .../flink/test/cancelling/CancelingTestBase.java   | 62 +++++++++++++---------
 .../flink/test/cancelling/JobCancelingITCase.java  | 37 +++++++------
 .../flink/test/cancelling/JoinCancelingITCase.java | 20 +++----
 .../flink/test/cancelling/MapCancelingITCase.java  | 12 ++---
 .../ContinuousFileProcessingCheckpointITCase.java  | 45 ++++++++--------
 .../checkpointing/StateCheckpointedITCase.java     | 20 +++----
 .../checkpointing/StreamCheckpointingITCase.java   | 17 +++---
 .../StreamFaultToleranceTestBase.java              | 52 +++++++++---------
 .../UdfStreamOperatorCheckpointingITCase.java      | 15 +++---
 9 files changed, 147 insertions(+), 133 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index f1aeb7f4192..dfa90e8f975 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -28,22 +28,26 @@ import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Assert;
-import org.junit.ClassRule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Base class for testing job cancellation. */
-public abstract class CancelingTestBase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+abstract class CancelingTestBase {
 
     private static final int MINIMUM_HEAP_SIZE_MB = 192;
 
@@ -51,28 +55,36 @@ public abstract class CancelingTestBase extends TestLogger {
 
     private static final Configuration configuration = getConfiguration();
 
+    protected ClusterClient<?> clusterClient;
+
     // 
--------------------------------------------------------------------------------------------
 
-    @ClassRule
-    public static final MiniClusterWithClientResource CLUSTER =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(configuration)
                             .setNumberTaskManagers(2)
                             .setNumberSlotsPerTaskManager(4)
                             .build());
 
+    @BeforeEach
+    void getClusterClient(@InjectClusterClient ClusterClient<?> clusterClient) 
{
+        this.clusterClient = clusterClient;
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private static void verifyJvmOptions() {
         final long heap = Runtime.getRuntime().maxMemory() >> 20;
-        Assert.assertTrue(
-                "Insufficient java heap space "
-                        + heap
-                        + "mb - set JVM option: -Xmx"
-                        + MINIMUM_HEAP_SIZE_MB
-                        + "m",
-                heap > MINIMUM_HEAP_SIZE_MB - 50);
+        assertThat(heap)
+                .as(
+                        "Insufficient java heap space "
+                                + heap
+                                + "mb - set JVM option: -Xmx"
+                                + MINIMUM_HEAP_SIZE_MB
+                                + "m")
+                .isGreaterThan(MINIMUM_HEAP_SIZE_MB - 50);
     }
 
     private static Configuration getConfiguration() {
@@ -93,34 +105,34 @@ public abstract class CancelingTestBase extends TestLogger 
{
         // submit job
         final long rpcTimeout = 
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis();
 
-        ClusterClient<?> client = CLUSTER.getClusterClient();
-        JobID jobID = client.submitJob(jobGraph).get();
+        JobID jobID = clusterClient.submitJob(jobGraph).get();
 
         Deadline submissionDeadLine = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
 
-        JobStatus jobStatus = client.getJobStatus(jobID).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
+        JobStatus jobStatus =
+                clusterClient.getJobStatus(jobID).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
         while (jobStatus != JobStatus.RUNNING && 
submissionDeadLine.hasTimeLeft()) {
             Thread.sleep(50);
-            jobStatus = client.getJobStatus(jobID).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
+            jobStatus = clusterClient.getJobStatus(jobID).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
         }
         if (jobStatus != JobStatus.RUNNING) {
-            Assert.fail("Job not in state RUNNING.");
+            fail("Job not in state RUNNING.");
         }
 
         Thread.sleep(msecsTillCanceling);
 
-        client.cancel(jobID).get();
+        clusterClient.cancel(jobID).get();
 
         Deadline cancelDeadline =
                 new FiniteDuration(maxTimeTillCanceled, 
TimeUnit.MILLISECONDS).fromNow();
 
         JobStatus jobStatusAfterCancel =
-                client.getJobStatus(jobID).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
+                clusterClient.getJobStatus(jobID).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
         while (jobStatusAfterCancel != JobStatus.CANCELED && 
cancelDeadline.hasTimeLeft()) {
             Thread.sleep(50);
             jobStatusAfterCancel =
-                    client.getJobStatus(jobID).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
+                    clusterClient.getJobStatus(jobID).get(rpcTimeout, 
TimeUnit.MILLISECONDS);
         }
-        assertEquals(JobStatus.CANCELED, jobStatusAfterCancel);
+        assertThat(jobStatusAfterCancel).isEqualTo(JobStatus.CANCELED);
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JobCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JobCancelingITCase.java
index 9a76e1049ff..96f3016675b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JobCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JobCancelingITCase.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -32,37 +33,40 @@ import 
org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
 
-import static junit.framework.TestCase.assertEquals;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for canceling the job. */
-@SuppressWarnings("serial")
-public class JobCancelingITCase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class JobCancelingITCase {
 
     private static final int PARALLELISM = 4;
 
-    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
-
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(1)
                             .setNumberSlotsPerTaskManager(PARALLELISM)
                             .build());
 
     @Test
-    public void testCancelingWhileBackPressured() throws Exception {
+    void testCancelingWhileBackPressured(
+            @InjectMiniCluster MiniCluster miniCluster,
+            @InjectClusterClient ClusterClient<?> client)
+            throws Exception {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(PARALLELISM);
         env.getConfig().enableObjectReuse();
@@ -94,16 +98,15 @@ public class JobCancelingITCase extends TestLogger {
         StreamGraph streamGraph = env.getStreamGraph();
         JobGraph jobGraph = streamGraph.getJobGraph();
 
-        ClusterClient<?> client = MINI_CLUSTER.getClusterClient();
         JobID jobID = client.submitJob(jobGraph).get();
-        waitForAllTaskRunning(MINI_CLUSTER.getMiniCluster(), jobID, false);
+        waitForAllTaskRunning(miniCluster, jobID, false);
         // give a bit of time of back pressure to build up
         Thread.sleep(100);
 
         client.cancel(jobID).get();
         while (!client.getJobStatus(jobID).get().isTerminalState()) {}
 
-        assertEquals(JobStatus.CANCELED, client.getJobStatus(jobID).get());
+        
assertThat(client.getJobStatus(jobID).get()).isEqualTo(JobStatus.CANCELED);
     }
 
     private static class InfiniteLongSourceFunction implements 
SourceFunction<Long> {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index 90c9231ab8d..71890d62bff 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -29,12 +29,12 @@ import 
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
 import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
 
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 
 /** Test job cancellation from within a JoinFunction. */
-@Ignore("Takes too long.")
-public class JoinCancelingITCase extends CancelingTestBase {
+@Disabled("Takes too long.")
+class JoinCancelingITCase extends CancelingTestBase {
 
     // --------------- Test Sort Matches that are canceled while still reading 
/ sorting
     // -----------------
@@ -77,17 +77,17 @@ public class JoinCancelingITCase extends CancelingTestBase {
     }
 
     @Test
-    public void testCancelWhileReadingSlowInputs() throws Exception {
+    void testCancelWhileReadingSlowInputs() throws Exception {
         executeTask(new SimpleMatcher<Integer>(), true);
     }
 
     @Test
-    public void testCancelWhileReadingFastInputs() throws Exception {
+    void testCancelWhileReadingFastInputs() throws Exception {
         executeTask(new SimpleMatcher<Integer>(), false);
     }
 
     @Test
-    public void testCancelPriorToFirstRecordReading() throws Exception {
+    void testCancelPriorToFirstRecordReading() throws Exception {
         executeTask(new StuckInOpenMatcher<Integer>(), false);
     }
 
@@ -123,12 +123,12 @@ public class JoinCancelingITCase extends 
CancelingTestBase {
     // -----------------
 
     @Test
-    public void testCancelWhileJoining() throws Exception {
+    void testCancelWhileJoining() throws Exception {
         executeTaskWithGenerator(new DelayingMatcher<Integer>(), 500, 3, 10 * 
1000, 20 * 1000);
     }
 
     @Test
-    public void testCancelWithLongCancellingResponse() throws Exception {
+    void testCancelWithLongCancellingResponse() throws Exception {
         executeTaskWithGenerator(
                 new LongCancelTimeMatcher<Integer>(), 500, 3, 10 * 1000, 10 * 
1000);
     }
@@ -137,7 +137,7 @@ public class JoinCancelingITCase extends CancelingTestBase {
     // ---------------------------------
 
     @Test
-    public void testCancelWithHighparallelism() throws Exception {
+    void testCancelWithHighparallelism() throws Exception {
         executeTask(new SimpleMatcher<Integer>(), false, 64);
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 87cd8e25bb6..9b5fa4146a7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -25,28 +25,28 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 import org.apache.flink.test.util.InfiniteIntegerInputFormat;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 /** Test job cancellation from within a MapFunction. */
-public class MapCancelingITCase extends CancelingTestBase {
+class MapCancelingITCase extends CancelingTestBase {
 
     @Test
-    public void testMapCancelling() throws Exception {
+    void testMapCancelling() throws Exception {
         executeTask(new IdentityMapper<Integer>());
     }
 
     @Test
-    public void testSlowMapCancelling() throws Exception {
+    void testSlowMapCancelling() throws Exception {
         executeTask(new DelayingIdentityMapper<Integer>());
     }
 
     @Test
-    public void testMapWithLongCancellingResponse() throws Exception {
+    void testMapWithLongCancellingResponse() throws Exception {
         executeTask(new LongCancelTimeIdentityMapper<Integer>());
     }
 
     @Test
-    public void testMapPriorToFirstRecordReading() throws Exception {
+    void testMapPriorToFirstRecordReading() throws Exception {
         executeTask(new StuckInOpenIdentityMapper<Integer>());
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 857847f566c..3722e2f8ad5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -38,10 +38,8 @@ import org.apache.flink.util.Collector;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.AssumptionViolatedException;
-import org.junit.Before;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 
 import java.io.File;
 import java.io.IOException;
@@ -55,11 +53,12 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Test checkpointing while sourcing a continuous file processor. */
-public class ContinuousFileProcessingCheckpointITCase extends 
StreamFaultToleranceTestBase {
+class ContinuousFileProcessingCheckpointITCase extends 
StreamFaultToleranceTestBase {
 
     private static final int NO_OF_FILES = 5;
     private static final int LINES_PER_FILE = 150;
@@ -72,15 +71,15 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
 
     private static Map<Integer, Set<String>> actualCollectedContent = new 
HashMap<>();
 
-    @Before
-    public void createHDFS() throws IOException {
-        if 
(failoverStrategy.equals(FailoverStrategy.RestartPipelinedRegionFailoverStrategy))
 {
-            // TODO the 'NO_OF_RETRIES' is useless for current 
RestartPipelinedRegionStrategy,
-            // for this ContinuousFileProcessingCheckpointITCase, using
-            // RestartPipelinedRegionStrategy would result in endless running.
-            throw new AssumptionViolatedException(
-                    "ignored ContinuousFileProcessingCheckpointITCase when 
using RestartPipelinedRegionStrategy");
-        }
+    @BeforeEach
+    void createHDFS() throws IOException {
+        // TODO the 'NO_OF_RETRIES' is useless for current 
RestartPipelinedRegionStrategy,
+        // for this ContinuousFileProcessingCheckpointITCase, using
+        // RestartPipelinedRegionStrategy would result in endless running.
+        assumeThat(failoverStrategy)
+                .as(
+                        "ignored ContinuousFileProcessingCheckpointITCase when 
using RestartPipelinedRegionStrategy")
+                
.isNotEqualTo(FailoverStrategy.RestartPipelinedRegionFailoverStrategy);
 
         baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile();
         FileUtil.fullyDelete(baseDir);
@@ -91,8 +90,8 @@ public class ContinuousFileProcessingCheckpointITCase extends 
StreamFaultToleran
         localFs = new 
org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
     }
 
-    @After
-    public void destroyHDFS() {
+    @AfterEach
+    void destroyHDFS() {
         if (baseDir != null) {
             FileUtil.fullyDelete(baseDir);
         }
@@ -136,10 +135,10 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
         fc.join();
 
         Map<Integer, Set<String>> collected = actualCollectedContent;
-        Assert.assertEquals(collected.size(), fc.getFileContent().size());
+        assertThat(fc.getFileContent()).hasSameSizeAs(collected);
 
         for (Integer fileIdx : fc.getFileContent().keySet()) {
-            Assert.assertTrue(collected.keySet().contains(fileIdx));
+            assertThat(collected).containsKey(fileIdx);
 
             List<String> cntnt = new ArrayList<>(collected.get(fileIdx));
             Collections.sort(
@@ -155,7 +154,7 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
             for (String line : cntnt) {
                 cntntStr.append(line);
             }
-            Assert.assertEquals(fc.getFileContent().get(fileIdx), 
cntntStr.toString());
+            assertThat(cntntStr).hasToString(fc.getFileContent().get(fileIdx));
         }
 
         collected.clear();
@@ -193,7 +192,7 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
         @Override
         public void open(OpenContext openContext) throws Exception {
             // this sink can only work with DOP 1
-            assertEquals(1, 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
+            
assertThat(getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks()).isOne();
 
             long failurePosMin = (long) (0.4 * LINES_PER_FILE);
             long failurePosMax = (long) (0.7 * LINES_PER_FILE);
@@ -315,7 +314,7 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
                     org.apache.hadoop.fs.Path file =
                             new org.apache.hadoop.fs.Path(localFsURI + "/file" 
+ i);
                     localFs.rename(tmpFile.f0, file);
-                    Assert.assertTrue(localFs.exists(file));
+                    assertThat(localFs.exists(file)).isTrue();
 
                     filesCreated.add(file);
                     fileContents.put(i, tmpFile.f1);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
index 07e80f5bea7..29220eb6353 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -41,8 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
@@ -55,8 +54,7 @@ import static org.junit.Assert.assertTrue;
  * the recovery does not fall back to "square one" (which would naturally lead 
to correct results
  * without testing the checkpointing).
  */
-@SuppressWarnings("serial")
-public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
+class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StateCheckpointedITCase.class);
 
@@ -71,7 +69,7 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
      */
     @Override
     public void testProgram(StreamExecutionEnvironment env) {
-        assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+        assertThat(NUM_STRINGS % 40).as("Broken test setup").isZero();
 
         final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
         final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
@@ -103,7 +101,9 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
     @Override
     public void postSubmit() {
 
-        // assertTrue("Test inconclusive: failure occurred before first 
checkpoint",
+        // assertThat(failureOccurredBeforeFirstCheckpoint)
+        //         .as("Test inconclusive: failure occurred before first 
checkpoint")
+        //         .isTrue();
         //             OnceFailingAggregator.wasCheckpointedBeforeFailure);
         if (!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
             LOG.warn("Test inconclusive: failure occurred before first 
checkpoint");
@@ -125,13 +125,13 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
         }
 
         // verify that we counted exactly right
-        assertEquals(NUM_STRINGS, filterSum);
-        assertEquals(NUM_STRINGS, mapSum);
-        assertEquals(NUM_STRINGS, countSum);
+        assertThat(filterSum).isEqualTo(NUM_STRINGS);
+        assertThat(mapSum).isEqualTo(NUM_STRINGS);
+        assertThat(countSum).isEqualTo(NUM_STRINGS);
 
         for (Map<Character, Long> map : ValidatingSink.maps) {
             for (Long count : map.values()) {
-                assertEquals(NUM_STRINGS / 40, count.longValue());
+                assertThat(count).isEqualTo(NUM_STRINGS / 40L);
             }
         }
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 16ae7ab6a8c..80f9d32f1a4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -37,7 +37,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
@@ -45,10 +45,9 @@ import static org.junit.Assert.assertEquals;
  * <p>The test triggers a failure after a while and verifies that, after 
completion, the state
  * defined with the {@link ListCheckpointed} interface reflects the "exactly 
once" semantics.
  */
-@SuppressWarnings("serial")
-public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
+class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
-    static final long NUM_STRINGS = 10_000_000L;
+    private static final long NUM_STRINGS = 10_000_000L;
 
     /**
      * Runs the following program.
@@ -106,13 +105,13 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
             reduceInputCount += l;
         }
 
-        assertEquals(NUM_STRINGS, filterSum);
-        assertEquals(NUM_STRINGS, mapSum);
-        assertEquals(NUM_STRINGS, countSum);
-        assertEquals(NUM_STRINGS, reduceInputCount);
+        assertThat(filterSum).isEqualTo(NUM_STRINGS);
+        assertThat(mapSum).isEqualTo(NUM_STRINGS);
+        assertThat(countSum).isEqualTo(NUM_STRINGS);
+        assertThat(reduceInputCount).isEqualTo(NUM_STRINGS);
         // verify that we counted exactly right
         for (Long count : OnceFailingPrefixCounter.prefixCounts.values()) {
-            assertEquals(new Long(NUM_STRINGS / 40), count);
+            assertThat(count).isEqualTo(NUM_STRINGS / 40L);
         }
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 61de82d4139..db076573ea2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -27,54 +27,56 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 
 import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test base for fault tolerant streaming programs. */
-@RunWith(Parameterized.class)
-public abstract class StreamFaultToleranceTestBase extends TestLogger {
+@ExtendWith({TestLoggerExtension.class, ParameterizedTestExtension.class})
+abstract class StreamFaultToleranceTestBase {
 
-    @Parameterized.Parameters(name = "FailoverStrategy: {0}")
-    public static Collection<FailoverStrategy> parameters() {
+    @Parameters(name = "FailoverStrategy: {0}")
+    private static Collection<FailoverStrategy> parameters() {
         return Arrays.asList(
                 FailoverStrategy.RestartAllFailoverStrategy,
                 FailoverStrategy.RestartPipelinedRegionFailoverStrategy);
     }
 
     /** The failover strategy to use. */
-    public enum FailoverStrategy {
+    protected enum FailoverStrategy {
         RestartAllFailoverStrategy,
         RestartPipelinedRegionFailoverStrategy
     }
 
-    @Parameterized.Parameter public FailoverStrategy failoverStrategy;
+    @Parameter protected FailoverStrategy failoverStrategy;
 
     protected static final int NUM_TASK_MANAGERS = 3;
     protected static final int NUM_TASK_SLOTS = 4;
     protected static final int PARALLELISM = NUM_TASK_MANAGERS * 
NUM_TASK_SLOTS;
 
-    private static MiniClusterWithClientResource cluster;
+    private MiniClusterWithClientResource cluster;
 
-    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+    @TempDir private static File tempFolder;
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    void setup() throws Exception {
         Configuration configuration = new Configuration();
         switch (failoverStrategy) {
             case RestartPipelinedRegionFailoverStrategy:
@@ -89,7 +91,7 @@ public abstract class StreamFaultToleranceTestBase extends 
TestLogger {
         // Doing it on cluster level unconditionally as randomization 
currently happens on the job
         // level (environment); while this factory can only be set on the 
cluster level.
         FsStateChangelogStorageFactory.configure(
-                configuration, tempFolder.newFolder(), Duration.ofMinutes(1), 
10);
+                configuration, tempFolder, Duration.ofMinutes(1), 10);
         cluster =
                 new MiniClusterWithClientResource(
                         new MiniClusterResourceConfiguration.Builder()
@@ -100,8 +102,8 @@ public abstract class StreamFaultToleranceTestBase extends 
TestLogger {
         cluster.before();
     }
 
-    @After
-    public void shutDownExistingCluster() {
+    @AfterEach
+    void shutDownExistingCluster() {
         if (cluster != null) {
             cluster.after();
             cluster = null;
@@ -121,8 +123,8 @@ public abstract class StreamFaultToleranceTestBase extends 
TestLogger {
      * Runs the following program the test program defined in {@link
      * #testProgram(StreamExecutionEnvironment)} followed by the checks in 
{@link #postSubmit}.
      */
-    @Test
-    public void runCheckpointedProgram() throws Exception {
+    @TestTemplate
+    void runCheckpointedProgram() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(PARALLELISM);
         env.enableCheckpointing(500);
@@ -135,7 +137,7 @@ public abstract class StreamFaultToleranceTestBase extends 
TestLogger {
             submitJobAndWaitForResult(
                     cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
         } catch (Exception e) {
-            Assert.assertTrue(ExceptionUtils.findThrowable(e, 
SuccessException.class).isPresent());
+            assertThat(ExceptionUtils.findThrowable(e, 
SuccessException.class)).isPresent();
         }
 
         postSubmit();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index 5f69b472529..e69012d7d77 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -32,13 +32,13 @@ import 
org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.EvictingQueue;
 
-import org.junit.Assert;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
 import java.util.Random;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /**
  * Integration test ensuring that the persistent state defined by the 
implementations of {@link
  * AbstractUdfStreamOperator} is correctly restored in case of recovery from a 
failure.
@@ -46,8 +46,7 @@ import java.util.Random;
  * <p>The topology currently tests the proper behaviour of the {@link 
StreamGroupedReduceOperator}
  * operator.
  */
-@SuppressWarnings("serial")
-public class UdfStreamOperatorCheckpointingITCase extends 
StreamFaultToleranceTestBase {
+class UdfStreamOperatorCheckpointingITCase extends 
StreamFaultToleranceTestBase {
 
     private static final long NUM_INPUT = 500_000L;
     private static final int NUM_OUTPUT = 1_000;
@@ -94,7 +93,7 @@ public class UdfStreamOperatorCheckpointingITCase extends 
StreamFaultToleranceTe
         // Checking the result of the built-in aggregate
         for (int i = 0; i < PARALLELISM; i++) {
             for (Long value : MinEvictingQueueSink.queues[i]) {
-                Assert.assertTrue("Value different from 1 found, was " + value 
+ ".", value == 1);
+                assertThat(value).as("Value different from 1 found, was " + 
value + ".").isOne();
             }
         }
 
@@ -105,9 +104,9 @@ public class UdfStreamOperatorCheckpointingITCase extends 
StreamFaultToleranceTe
             while (!SumEvictingQueueSink.queues[i].isEmpty()) {
                 sum += ++prevCount;
                 Long value = SumEvictingQueueSink.queues[i].remove();
-                Assert.assertTrue(
-                        "Unexpected reduce value " + value + " instead of " + 
sum + ".",
-                        value == sum);
+                assertThat(value)
+                        .as("Unexpected reduce value " + value + " instead of 
" + sum + ".")
+                        .isEqualTo(sum);
             }
         }
     }


Reply via email to