This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3d7e4f319ae [FLINK-39124][JUnit5 Migration] Migrate the JUnit5 for
Module: flink-tests (#27665)
3d7e4f319ae is described below
commit 3d7e4f319ae935bdd9f5aa6d49f09a51662b561e
Author: Mate Czagany <[email protected]>
AuthorDate: Wed Mar 18 11:01:25 2026 +0100
[FLINK-39124][JUnit5 Migration] Migrate the JUnit5 for Module: flink-tests
(#27665)
[FLINK-39124][test] Migrate CancellingTestBase and
StreamFaultToleranceTestBase to JUnit 5
---
.../flink/test/cancelling/CancelingTestBase.java | 62 +++++++++++++---------
.../flink/test/cancelling/JobCancelingITCase.java | 37 +++++++------
.../flink/test/cancelling/JoinCancelingITCase.java | 20 +++----
.../flink/test/cancelling/MapCancelingITCase.java | 12 ++---
.../ContinuousFileProcessingCheckpointITCase.java | 45 ++++++++--------
.../checkpointing/StateCheckpointedITCase.java | 20 +++----
.../checkpointing/StreamCheckpointingITCase.java | 17 +++---
.../StreamFaultToleranceTestBase.java | 52 +++++++++---------
.../UdfStreamOperatorCheckpointingITCase.java | 15 +++---
9 files changed, 147 insertions(+), 133 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index f1aeb7f4192..dfa90e8f975 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -28,22 +28,26 @@ import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/** Base class for testing job cancellation. */
-public abstract class CancelingTestBase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+abstract class CancelingTestBase {
private static final int MINIMUM_HEAP_SIZE_MB = 192;
@@ -51,28 +55,36 @@ public abstract class CancelingTestBase extends TestLogger {
private static final Configuration configuration = getConfiguration();
+ protected ClusterClient<?> clusterClient;
+
//
--------------------------------------------------------------------------------------------
- @ClassRule
- public static final MiniClusterWithClientResource CLUSTER =
- new MiniClusterWithClientResource(
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(configuration)
.setNumberTaskManagers(2)
.setNumberSlotsPerTaskManager(4)
.build());
+ @BeforeEach
+ void getClusterClient(@InjectClusterClient ClusterClient<?> clusterClient)
{
+ this.clusterClient = clusterClient;
+ }
+
//
--------------------------------------------------------------------------------------------
private static void verifyJvmOptions() {
final long heap = Runtime.getRuntime().maxMemory() >> 20;
- Assert.assertTrue(
- "Insufficient java heap space "
- + heap
- + "mb - set JVM option: -Xmx"
- + MINIMUM_HEAP_SIZE_MB
- + "m",
- heap > MINIMUM_HEAP_SIZE_MB - 50);
+ assertThat(heap)
+ .as(
+ "Insufficient java heap space "
+ + heap
+ + "mb - set JVM option: -Xmx"
+ + MINIMUM_HEAP_SIZE_MB
+ + "m")
+ .isGreaterThan(MINIMUM_HEAP_SIZE_MB - 50);
}
private static Configuration getConfiguration() {
@@ -93,34 +105,34 @@ public abstract class CancelingTestBase extends TestLogger
{
// submit job
final long rpcTimeout =
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis();
- ClusterClient<?> client = CLUSTER.getClusterClient();
- JobID jobID = client.submitJob(jobGraph).get();
+ JobID jobID = clusterClient.submitJob(jobGraph).get();
Deadline submissionDeadLine = new FiniteDuration(2,
TimeUnit.MINUTES).fromNow();
- JobStatus jobStatus = client.getJobStatus(jobID).get(rpcTimeout,
TimeUnit.MILLISECONDS);
+ JobStatus jobStatus =
+ clusterClient.getJobStatus(jobID).get(rpcTimeout,
TimeUnit.MILLISECONDS);
while (jobStatus != JobStatus.RUNNING &&
submissionDeadLine.hasTimeLeft()) {
Thread.sleep(50);
- jobStatus = client.getJobStatus(jobID).get(rpcTimeout,
TimeUnit.MILLISECONDS);
+ jobStatus = clusterClient.getJobStatus(jobID).get(rpcTimeout,
TimeUnit.MILLISECONDS);
}
if (jobStatus != JobStatus.RUNNING) {
- Assert.fail("Job not in state RUNNING.");
+ fail("Job not in state RUNNING.");
}
Thread.sleep(msecsTillCanceling);
- client.cancel(jobID).get();
+ clusterClient.cancel(jobID).get();
Deadline cancelDeadline =
new FiniteDuration(maxTimeTillCanceled,
TimeUnit.MILLISECONDS).fromNow();
JobStatus jobStatusAfterCancel =
- client.getJobStatus(jobID).get(rpcTimeout,
TimeUnit.MILLISECONDS);
+ clusterClient.getJobStatus(jobID).get(rpcTimeout,
TimeUnit.MILLISECONDS);
while (jobStatusAfterCancel != JobStatus.CANCELED &&
cancelDeadline.hasTimeLeft()) {
Thread.sleep(50);
jobStatusAfterCancel =
- client.getJobStatus(jobID).get(rpcTimeout,
TimeUnit.MILLISECONDS);
+ clusterClient.getJobStatus(jobID).get(rpcTimeout,
TimeUnit.MILLISECONDS);
}
- assertEquals(JobStatus.CANCELED, jobStatusAfterCancel);
+ assertThat(jobStatusAfterCancel).isEqualTo(JobStatus.CANCELED);
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JobCancelingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JobCancelingITCase.java
index 9a76e1049ff..96f3016675b 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JobCancelingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JobCancelingITCase.java
@@ -24,6 +24,7 @@ import
org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -32,37 +33,40 @@ import
org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Duration;
-import static junit.framework.TestCase.assertEquals;
import static
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for canceling the job. */
-@SuppressWarnings("serial")
-public class JobCancelingITCase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class JobCancelingITCase {
private static final int PARALLELISM = 4;
- @ClassRule public static final TemporaryFolder TMP_FOLDER = new
TemporaryFolder();
-
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER =
- new MiniClusterWithClientResource(
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.build());
@Test
- public void testCancelingWhileBackPressured() throws Exception {
+ void testCancelingWhileBackPressured(
+ @InjectMiniCluster MiniCluster miniCluster,
+ @InjectClusterClient ClusterClient<?> client)
+ throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.getConfig().enableObjectReuse();
@@ -94,16 +98,15 @@ public class JobCancelingITCase extends TestLogger {
StreamGraph streamGraph = env.getStreamGraph();
JobGraph jobGraph = streamGraph.getJobGraph();
- ClusterClient<?> client = MINI_CLUSTER.getClusterClient();
JobID jobID = client.submitJob(jobGraph).get();
- waitForAllTaskRunning(MINI_CLUSTER.getMiniCluster(), jobID, false);
+ waitForAllTaskRunning(miniCluster, jobID, false);
// give a bit of time of back pressure to build up
Thread.sleep(100);
client.cancel(jobID).get();
while (!client.getJobStatus(jobID).get().isTerminalState()) {}
- assertEquals(JobStatus.CANCELED, client.getJobStatus(jobID).get());
+
assertThat(client.getJobStatus(jobID).get()).isEqualTo(JobStatus.CANCELED);
}
private static class InfiniteLongSourceFunction implements
SourceFunction<Long> {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index 90c9231ab8d..71890d62bff 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -29,12 +29,12 @@ import
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
/** Test job cancellation from within a JoinFunction. */
-@Ignore("Takes too long.")
-public class JoinCancelingITCase extends CancelingTestBase {
+@Disabled("Takes too long.")
+class JoinCancelingITCase extends CancelingTestBase {
// --------------- Test Sort Matches that are canceled while still reading
/ sorting
// -----------------
@@ -77,17 +77,17 @@ public class JoinCancelingITCase extends CancelingTestBase {
}
@Test
- public void testCancelWhileReadingSlowInputs() throws Exception {
+ void testCancelWhileReadingSlowInputs() throws Exception {
executeTask(new SimpleMatcher<Integer>(), true);
}
@Test
- public void testCancelWhileReadingFastInputs() throws Exception {
+ void testCancelWhileReadingFastInputs() throws Exception {
executeTask(new SimpleMatcher<Integer>(), false);
}
@Test
- public void testCancelPriorToFirstRecordReading() throws Exception {
+ void testCancelPriorToFirstRecordReading() throws Exception {
executeTask(new StuckInOpenMatcher<Integer>(), false);
}
@@ -123,12 +123,12 @@ public class JoinCancelingITCase extends
CancelingTestBase {
// -----------------
@Test
- public void testCancelWhileJoining() throws Exception {
+ void testCancelWhileJoining() throws Exception {
executeTaskWithGenerator(new DelayingMatcher<Integer>(), 500, 3, 10 *
1000, 20 * 1000);
}
@Test
- public void testCancelWithLongCancellingResponse() throws Exception {
+ void testCancelWithLongCancellingResponse() throws Exception {
executeTaskWithGenerator(
new LongCancelTimeMatcher<Integer>(), 500, 3, 10 * 1000, 10 *
1000);
}
@@ -137,7 +137,7 @@ public class JoinCancelingITCase extends CancelingTestBase {
// ---------------------------------
@Test
- public void testCancelWithHighparallelism() throws Exception {
+ void testCancelWithHighparallelism() throws Exception {
executeTask(new SimpleMatcher<Integer>(), false, 64);
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 87cd8e25bb6..9b5fa4146a7 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -25,28 +25,28 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.util.InfiniteIntegerInputFormat;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
/** Test job cancellation from within a MapFunction. */
-public class MapCancelingITCase extends CancelingTestBase {
+class MapCancelingITCase extends CancelingTestBase {
@Test
- public void testMapCancelling() throws Exception {
+ void testMapCancelling() throws Exception {
executeTask(new IdentityMapper<Integer>());
}
@Test
- public void testSlowMapCancelling() throws Exception {
+ void testSlowMapCancelling() throws Exception {
executeTask(new DelayingIdentityMapper<Integer>());
}
@Test
- public void testMapWithLongCancellingResponse() throws Exception {
+ void testMapWithLongCancellingResponse() throws Exception {
executeTask(new LongCancelTimeIdentityMapper<Integer>());
}
@Test
- public void testMapPriorToFirstRecordReading() throws Exception {
+ void testMapPriorToFirstRecordReading() throws Exception {
executeTask(new StuckInOpenIdentityMapper<Integer>());
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 857847f566c..3722e2f8ad5 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -38,10 +38,8 @@ import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.AssumptionViolatedException;
-import org.junit.Before;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import java.io.File;
import java.io.IOException;
@@ -55,11 +53,12 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.junit.jupiter.api.Assertions.fail;
/** Test checkpointing while sourcing a continuous file processor. */
-public class ContinuousFileProcessingCheckpointITCase extends
StreamFaultToleranceTestBase {
+class ContinuousFileProcessingCheckpointITCase extends
StreamFaultToleranceTestBase {
private static final int NO_OF_FILES = 5;
private static final int LINES_PER_FILE = 150;
@@ -72,15 +71,15 @@ public class ContinuousFileProcessingCheckpointITCase
extends StreamFaultToleran
private static Map<Integer, Set<String>> actualCollectedContent = new
HashMap<>();
- @Before
- public void createHDFS() throws IOException {
- if
(failoverStrategy.equals(FailoverStrategy.RestartPipelinedRegionFailoverStrategy))
{
- // TODO the 'NO_OF_RETRIES' is useless for current
RestartPipelinedRegionStrategy,
- // for this ContinuousFileProcessingCheckpointITCase, using
- // RestartPipelinedRegionStrategy would result in endless running.
- throw new AssumptionViolatedException(
- "ignored ContinuousFileProcessingCheckpointITCase when
using RestartPipelinedRegionStrategy");
- }
+ @BeforeEach
+ void createHDFS() throws IOException {
+ // TODO the 'NO_OF_RETRIES' is useless for current
RestartPipelinedRegionStrategy,
+ // for this ContinuousFileProcessingCheckpointITCase, using
+ // RestartPipelinedRegionStrategy would result in endless running.
+ assumeThat(failoverStrategy)
+ .as(
+ "ignored ContinuousFileProcessingCheckpointITCase when
using RestartPipelinedRegionStrategy")
+
.isNotEqualTo(FailoverStrategy.RestartPipelinedRegionFailoverStrategy);
baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
@@ -91,8 +90,8 @@ public class ContinuousFileProcessingCheckpointITCase extends
StreamFaultToleran
localFs = new
org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
}
- @After
- public void destroyHDFS() {
+ @AfterEach
+ void destroyHDFS() {
if (baseDir != null) {
FileUtil.fullyDelete(baseDir);
}
@@ -136,10 +135,10 @@ public class ContinuousFileProcessingCheckpointITCase
extends StreamFaultToleran
fc.join();
Map<Integer, Set<String>> collected = actualCollectedContent;
- Assert.assertEquals(collected.size(), fc.getFileContent().size());
+ assertThat(fc.getFileContent()).hasSameSizeAs(collected);
for (Integer fileIdx : fc.getFileContent().keySet()) {
- Assert.assertTrue(collected.keySet().contains(fileIdx));
+ assertThat(collected).containsKey(fileIdx);
List<String> cntnt = new ArrayList<>(collected.get(fileIdx));
Collections.sort(
@@ -155,7 +154,7 @@ public class ContinuousFileProcessingCheckpointITCase
extends StreamFaultToleran
for (String line : cntnt) {
cntntStr.append(line);
}
- Assert.assertEquals(fc.getFileContent().get(fileIdx),
cntntStr.toString());
+ assertThat(cntntStr).hasToString(fc.getFileContent().get(fileIdx));
}
collected.clear();
@@ -193,7 +192,7 @@ public class ContinuousFileProcessingCheckpointITCase
extends StreamFaultToleran
@Override
public void open(OpenContext openContext) throws Exception {
// this sink can only work with DOP 1
- assertEquals(1,
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
+
assertThat(getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks()).isOne();
long failurePosMin = (long) (0.4 * LINES_PER_FILE);
long failurePosMax = (long) (0.7 * LINES_PER_FILE);
@@ -315,7 +314,7 @@ public class ContinuousFileProcessingCheckpointITCase
extends StreamFaultToleran
org.apache.hadoop.fs.Path file =
new org.apache.hadoop.fs.Path(localFsURI + "/file"
+ i);
localFs.rename(tmpFile.f0, file);
- Assert.assertTrue(localFs.exists(file));
+ assertThat(localFs.exists(file)).isTrue();
filesCreated.add(file);
fileContents.put(i, tmpFile.f1);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
index 07e80f5bea7..29220eb6353 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -41,8 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
@@ -55,8 +54,7 @@ import static org.junit.Assert.assertTrue;
* the recovery does not fall back to "square one" (which would naturally lead
to correct results
* without testing the checkpointing).
*/
-@SuppressWarnings("serial")
-public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
+class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(StateCheckpointedITCase.class);
@@ -71,7 +69,7 @@ public class StateCheckpointedITCase extends
StreamFaultToleranceTestBase {
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
- assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+ assertThat(NUM_STRINGS % 40).as("Broken test setup").isZero();
final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
@@ -103,7 +101,9 @@ public class StateCheckpointedITCase extends
StreamFaultToleranceTestBase {
@Override
public void postSubmit() {
- // assertTrue("Test inconclusive: failure occurred before first
checkpoint",
+ // assertThat(failureOccurredBeforeFirstCheckpoint)
+ // .as("Test inconclusive: failure occurred before first
checkpoint")
+ // .isTrue();
// OnceFailingAggregator.wasCheckpointedBeforeFailure);
if (!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
LOG.warn("Test inconclusive: failure occurred before first
checkpoint");
@@ -125,13 +125,13 @@ public class StateCheckpointedITCase extends
StreamFaultToleranceTestBase {
}
// verify that we counted exactly right
- assertEquals(NUM_STRINGS, filterSum);
- assertEquals(NUM_STRINGS, mapSum);
- assertEquals(NUM_STRINGS, countSum);
+ assertThat(filterSum).isEqualTo(NUM_STRINGS);
+ assertThat(mapSum).isEqualTo(NUM_STRINGS);
+ assertThat(countSum).isEqualTo(NUM_STRINGS);
for (Map<Character, Long> map : ValidatingSink.maps) {
for (Long count : map.values()) {
- assertEquals(NUM_STRINGS / 40, count.longValue());
+ assertThat(count).isEqualTo(NUM_STRINGS / 40L);
}
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 16ae7ab6a8c..80f9d32f1a4 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -37,7 +37,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
@@ -45,10 +45,9 @@ import static org.junit.Assert.assertEquals;
* <p>The test triggers a failure after a while and verifies that, after
completion, the state
* defined with the {@link ListCheckpointed} interface reflects the "exactly
once" semantics.
*/
-@SuppressWarnings("serial")
-public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
+class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
- static final long NUM_STRINGS = 10_000_000L;
+ private static final long NUM_STRINGS = 10_000_000L;
/**
* Runs the following program.
@@ -106,13 +105,13 @@ public class StreamCheckpointingITCase extends
StreamFaultToleranceTestBase {
reduceInputCount += l;
}
- assertEquals(NUM_STRINGS, filterSum);
- assertEquals(NUM_STRINGS, mapSum);
- assertEquals(NUM_STRINGS, countSum);
- assertEquals(NUM_STRINGS, reduceInputCount);
+ assertThat(filterSum).isEqualTo(NUM_STRINGS);
+ assertThat(mapSum).isEqualTo(NUM_STRINGS);
+ assertThat(countSum).isEqualTo(NUM_STRINGS);
+ assertThat(reduceInputCount).isEqualTo(NUM_STRINGS);
// verify that we counted exactly right
for (Long count : OnceFailingPrefixCounter.prefixCounts.values()) {
- assertEquals(new Long(NUM_STRINGS / 40), count);
+ assertThat(count).isEqualTo(NUM_STRINGS / 40L);
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 61de82d4139..db076573ea2 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -27,54 +27,56 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.File;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.assertj.core.api.Assertions.assertThat;
/** Test base for fault tolerant streaming programs. */
-@RunWith(Parameterized.class)
-public abstract class StreamFaultToleranceTestBase extends TestLogger {
+@ExtendWith({TestLoggerExtension.class, ParameterizedTestExtension.class})
+abstract class StreamFaultToleranceTestBase {
- @Parameterized.Parameters(name = "FailoverStrategy: {0}")
- public static Collection<FailoverStrategy> parameters() {
+ @Parameters(name = "FailoverStrategy: {0}")
+ private static Collection<FailoverStrategy> parameters() {
return Arrays.asList(
FailoverStrategy.RestartAllFailoverStrategy,
FailoverStrategy.RestartPipelinedRegionFailoverStrategy);
}
/** The failover strategy to use. */
- public enum FailoverStrategy {
+ protected enum FailoverStrategy {
RestartAllFailoverStrategy,
RestartPipelinedRegionFailoverStrategy
}
- @Parameterized.Parameter public FailoverStrategy failoverStrategy;
+ @Parameter protected FailoverStrategy failoverStrategy;
protected static final int NUM_TASK_MANAGERS = 3;
protected static final int NUM_TASK_SLOTS = 4;
protected static final int PARALLELISM = NUM_TASK_MANAGERS *
NUM_TASK_SLOTS;
- private static MiniClusterWithClientResource cluster;
+ private MiniClusterWithClientResource cluster;
- @ClassRule public static TemporaryFolder tempFolder = new
TemporaryFolder();
+ @TempDir private static File tempFolder;
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ void setup() throws Exception {
Configuration configuration = new Configuration();
switch (failoverStrategy) {
case RestartPipelinedRegionFailoverStrategy:
@@ -89,7 +91,7 @@ public abstract class StreamFaultToleranceTestBase extends
TestLogger {
// Doing it on cluster level unconditionally as randomization
currently happens on the job
// level (environment); while this factory can only be set on the
cluster level.
FsStateChangelogStorageFactory.configure(
- configuration, tempFolder.newFolder(), Duration.ofMinutes(1),
10);
+ configuration, tempFolder, Duration.ofMinutes(1), 10);
cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
@@ -100,8 +102,8 @@ public abstract class StreamFaultToleranceTestBase extends
TestLogger {
cluster.before();
}
- @After
- public void shutDownExistingCluster() {
+ @AfterEach
+ void shutDownExistingCluster() {
if (cluster != null) {
cluster.after();
cluster = null;
@@ -121,8 +123,8 @@ public abstract class StreamFaultToleranceTestBase extends
TestLogger {
* Runs the following program the test program defined in {@link
* #testProgram(StreamExecutionEnvironment)} followed by the checks in
{@link #postSubmit}.
*/
- @Test
- public void runCheckpointedProgram() throws Exception {
+ @TestTemplate
+ void runCheckpointedProgram() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.enableCheckpointing(500);
@@ -135,7 +137,7 @@ public abstract class StreamFaultToleranceTestBase extends
TestLogger {
submitJobAndWaitForResult(
cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
} catch (Exception e) {
- Assert.assertTrue(ExceptionUtils.findThrowable(e,
SuccessException.class).isPresent());
+ assertThat(ExceptionUtils.findThrowable(e,
SuccessException.class)).isPresent();
}
postSubmit();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index 5f69b472529..e69012d7d77 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -32,13 +32,13 @@ import
org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator;
import org.apache.flink.shaded.guava33.com.google.common.collect.EvictingQueue;
-import org.junit.Assert;
-
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Random;
+import static org.assertj.core.api.Assertions.assertThat;
+
/**
* Integration test ensuring that the persistent state defined by the
implementations of {@link
* AbstractUdfStreamOperator} is correctly restored in case of recovery from a
failure.
@@ -46,8 +46,7 @@ import java.util.Random;
* <p>The topology currently tests the proper behaviour of the {@link
StreamGroupedReduceOperator}
* operator.
*/
-@SuppressWarnings("serial")
-public class UdfStreamOperatorCheckpointingITCase extends
StreamFaultToleranceTestBase {
+class UdfStreamOperatorCheckpointingITCase extends
StreamFaultToleranceTestBase {
private static final long NUM_INPUT = 500_000L;
private static final int NUM_OUTPUT = 1_000;
@@ -94,7 +93,7 @@ public class UdfStreamOperatorCheckpointingITCase extends
StreamFaultToleranceTe
// Checking the result of the built-in aggregate
for (int i = 0; i < PARALLELISM; i++) {
for (Long value : MinEvictingQueueSink.queues[i]) {
- Assert.assertTrue("Value different from 1 found, was " + value
+ ".", value == 1);
+ assertThat(value).as("Value different from 1 found, was " +
value + ".").isOne();
}
}
@@ -105,9 +104,9 @@ public class UdfStreamOperatorCheckpointingITCase extends
StreamFaultToleranceTe
while (!SumEvictingQueueSink.queues[i].isEmpty()) {
sum += ++prevCount;
Long value = SumEvictingQueueSink.queues[i].remove();
- Assert.assertTrue(
- "Unexpected reduce value " + value + " instead of " +
sum + ".",
- value == sum);
+ assertThat(value)
+ .as("Unexpected reduce value " + value + " instead of
" + sum + ".")
+ .isEqualTo(sum);
}
}
}