This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 92eef24d4cc531d6474252ef909fc6d431285dd9
Author: Roc Marshal <flin...@126.com>
AuthorDate: Mon Jun 26 23:03:04 2023 +0800

    [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility
---
 .../java/hadoop/mapred/HadoopInputFormatTest.java  | 44 ++++++-----
 .../java/hadoop/mapred/HadoopOutputFormatTest.java | 20 ++---
 .../mapred/wrapper/HadoopInputSplitTest.java       | 35 ++++-----
 .../hadoop/mapreduce/HadoopInputFormatTest.java    | 53 +++++++------
 .../hadoop/mapreduce/HadoopOutputFormatTest.java   | 18 ++---
 .../api/java/typeutils/WritableExtractionTest.java | 86 +++++++++++-----------
 .../typeutils/runtime/WritableSerializerTest.java  |  6 +-
 .../mapred/HadoopIOFormatsITCase.java              | 57 +++++++++-----
 .../mapred/HadoopMapFunctionITCase.java            | 40 +++++-----
 .../mapred/HadoopMapredITCase.java                 | 18 +++--
 .../mapred/HadoopReduceCombineFunctionITCase.java  | 51 ++++++-------
 .../mapred/HadoopReduceFunctionITCase.java         | 40 +++++-----
 .../mapred/WordCountMapredITCase.java              | 24 +++---
 .../wrapper/HadoopTupleUnwrappingIteratorTest.java | 69 ++++++++---------
 .../mapreduce/HadoopInputOutputITCase.java         | 21 +++---
 .../mapreduce/WordCountMapreduceITCase.java        | 21 +++---
 .../org.junit.jupiter.api.extension.Extension      | 16 ++++
 .../scala/ScalaWritableTypeInfoTest.scala          | 11 ++-
 .../scala/WordCountMapredITCase.scala              | 13 ++--
 .../scala/WordCountMapreduceITCase.scala           | 13 ++--
 20 files changed, 336 insertions(+), 320 deletions(-)

diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
index 2012d6ddcf1..a5d07569cc2 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
@@ -35,13 +35,11 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
@@ -52,10 +50,10 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for {@link HadoopInputFormat}. */
-public class HadoopInputFormatTest {
+class HadoopInputFormatTest {
 
     @Test
-    public void testConfigureWithConfigurableInstance() {
+    void testConfigureWithConfigurableInstance() {
         ConfigurableDummyInputFormat inputFormat = 
mock(ConfigurableDummyInputFormat.class);
 
         HadoopInputFormat<String, Long> hadoopInputFormat =
@@ -67,7 +65,7 @@ public class HadoopInputFormatTest {
     }
 
     @Test
-    public void testConfigureWithJobConfigurableInstance() {
+    void testConfigureWithJobConfigurableInstance() {
         JobConfigurableDummyInputFormat inputFormat = 
mock(JobConfigurableDummyInputFormat.class);
 
         HadoopInputFormat<String, Long> hadoopInputFormat =
@@ -79,7 +77,7 @@ public class HadoopInputFormatTest {
     }
 
     @Test
-    public void testOpenClose() throws Exception {
+    void testOpenClose() throws Exception {
         DummyRecordReader recordReader = mock(DummyRecordReader.class);
         DummyInputFormat inputFormat = mock(DummyInputFormat.class);
         when(inputFormat.getRecordReader(
@@ -95,14 +93,14 @@ public class HadoopInputFormatTest {
         verify(recordReader, times(1)).createKey();
         verify(recordReader, times(1)).createValue();
 
-        assertThat(hadoopInputFormat.fetched, is(false));
+        assertThat(hadoopInputFormat.fetched).isFalse();
 
         hadoopInputFormat.close();
         verify(recordReader, times(1)).close();
     }
 
     @Test
-    public void testOpenWithConfigurableReader() throws Exception {
+    void testOpenWithConfigurableReader() throws Exception {
         ConfigurableDummyRecordReader recordReader = 
mock(ConfigurableDummyRecordReader.class);
         DummyInputFormat inputFormat = mock(DummyInputFormat.class);
         when(inputFormat.getRecordReader(
@@ -119,11 +117,11 @@ public class HadoopInputFormatTest {
         verify(recordReader, times(1)).createKey();
         verify(recordReader, times(1)).createValue();
 
-        assertThat(hadoopInputFormat.fetched, is(false));
+        assertThat(hadoopInputFormat.fetched).isFalse();
     }
 
     @Test
-    public void testCreateInputSplits() throws Exception {
+    void testCreateInputSplits() throws Exception {
 
         FileSplit[] result = new FileSplit[1];
         result[0] = getFileSplit();
@@ -138,7 +136,7 @@ public class HadoopInputFormatTest {
     }
 
     @Test
-    public void testReachedEndWithElementsRemaining() throws IOException {
+    void testReachedEndWithElementsRemaining() throws IOException {
 
         HadoopInputFormat<String, Long> hadoopInputFormat =
                 new HadoopInputFormat<>(
@@ -146,22 +144,22 @@ public class HadoopInputFormatTest {
         hadoopInputFormat.fetched = true;
         hadoopInputFormat.hasNext = true;
 
-        assertThat(hadoopInputFormat.reachedEnd(), is(false));
+        assertThat(hadoopInputFormat.reachedEnd()).isFalse();
     }
 
     @Test
-    public void testReachedEndWithNoElementsRemaining() throws IOException {
+    void testReachedEndWithNoElementsRemaining() throws IOException {
         HadoopInputFormat<String, Long> hadoopInputFormat =
                 new HadoopInputFormat<>(
                         new DummyInputFormat(), String.class, Long.class, new 
JobConf());
         hadoopInputFormat.fetched = true;
         hadoopInputFormat.hasNext = false;
 
-        assertThat(hadoopInputFormat.reachedEnd(), is(true));
+        assertThat(hadoopInputFormat.reachedEnd()).isTrue();
     }
 
     @Test
-    public void testFetchNext() throws IOException {
+    void testFetchNext() throws IOException {
         DummyRecordReader recordReader = mock(DummyRecordReader.class);
         when(recordReader.next(nullable(String.class), 
nullable(Long.class))).thenReturn(true);
 
@@ -176,12 +174,12 @@ public class HadoopInputFormatTest {
         hadoopInputFormat.fetchNext();
 
         verify(recordReader, times(1)).next(nullable(String.class), anyLong());
-        assertThat(hadoopInputFormat.hasNext, is(true));
-        assertThat(hadoopInputFormat.fetched, is(true));
+        assertThat(hadoopInputFormat.hasNext).isTrue();
+        assertThat(hadoopInputFormat.fetched).isTrue();
     }
 
     @Test
-    public void checkTypeInformation() throws Exception {
+    void checkTypeInformation() throws Exception {
         HadoopInputFormat<Void, Long> hadoopInputFormat =
                 new HadoopInputFormat<>(
                         new DummyVoidKeyInputFormat<Long>(), Void.class, 
Long.class, new JobConf());
@@ -190,8 +188,8 @@ public class HadoopInputFormatTest {
         TypeInformation<Tuple2<Void, Long>> expectedType =
                 new TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
 
-        assertThat(tupleType.isTupleType(), is(true));
-        assertThat(tupleType, is(equalTo(expectedType)));
+        assertThat(tupleType.isTupleType()).isTrue();
+        assertThat(tupleType).isEqualTo(expectedType);
     }
 
     @Test
@@ -212,7 +210,7 @@ public class HadoopInputFormatTest {
 
     private class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
 
-        public DummyVoidKeyInputFormat() {}
+        DummyVoidKeyInputFormat() {}
 
         @Override
         public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
index 8f006156210..f405a50fcde 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
@@ -32,14 +32,14 @@ import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
 
 import java.io.IOException;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.mock;
@@ -48,10 +48,10 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for {@link HadoopOutputFormat}. */
-public class HadoopOutputFormatTest {
+class HadoopOutputFormatTest {
 
     @Test
-    public void testOpen() throws Exception {
+    void testOpen() throws Exception {
 
         OutputFormat<String, Long> dummyOutputFormat = 
mock(DummyOutputFormat.class);
         DummyOutputCommitter outputCommitter = 
mock(DummyOutputCommitter.class);
@@ -74,7 +74,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testConfigureWithConfigurable() {
+    void testConfigureWithConfigurable() {
         ConfigurableDummyOutputFormat dummyOutputFormat = 
mock(ConfigurableDummyOutputFormat.class);
         JobConf jobConf = mock(JobConf.class);
 
@@ -87,7 +87,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testConfigureWithJobConfigurable() {
+    void testConfigureWithJobConfigurable() {
         JobConfigurableDummyOutputFormat dummyOutputFormat =
                 mock(JobConfigurableDummyOutputFormat.class);
         JobConf jobConf = mock(JobConf.class);
@@ -101,7 +101,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testCloseWithTaskCommit() throws Exception {
+    void testCloseWithTaskCommit() throws Exception {
         OutputFormat<String, Long> dummyOutputFormat = 
mock(DummyOutputFormat.class);
         DummyOutputCommitter outputCommitter = 
mock(DummyOutputCommitter.class);
         
when(outputCommitter.needsTaskCommit(nullable(TaskAttemptContext.class))).thenReturn(true);
@@ -120,7 +120,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testCloseWithoutTaskCommit() throws Exception {
+    void testCloseWithoutTaskCommit() throws Exception {
         OutputFormat<String, Long> dummyOutputFormat = 
mock(DummyOutputFormat.class);
         DummyOutputCommitter outputCommitter = 
mock(DummyOutputCommitter.class);
         
when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(false);
@@ -139,7 +139,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testWriteRecord() throws Exception {
+    void testWriteRecord() throws Exception {
         OutputFormat<String, Long> dummyOutputFormat = 
mock(DummyOutputFormat.class);
         DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
         JobConf jobConf = mock(JobConf.class);
@@ -154,7 +154,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testFinalizeGlobal() throws Exception {
+    void testFinalizeGlobal() throws Exception {
         OutputFormat<String, Long> dummyOutputFormat = 
mock(DummyOutputFormat.class);
         DummyOutputCommitter outputCommitter = 
mock(DummyOutputCommitter.class);
         JobConf jobConf = Mockito.spy(new JobConf());
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplitTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplitTest.java
index 741ffd72766..3390987005b 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplitTest.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplitTest.java
@@ -26,9 +26,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -37,13 +36,15 @@ import java.util.Arrays;
 import java.util.Objects;
 import java.util.function.Consumer;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Test for {@link HadoopInputSplit}. */
-public class HadoopInputSplitTest {
+class HadoopInputSplitTest {
 
     private JobConf conf;
 
-    @Before
-    public void before() {
+    @BeforeEach
+    void before() {
         Configuration configuration = new Configuration();
         for (int i = 0; i < 10000; i++) {
             configuration.set("key-" + i, "value-" + i);
@@ -62,22 +63,22 @@ public class HadoopInputSplitTest {
         serializeSizeChecker.accept(bytes.length);
 
         split = InstantiationUtil.deserializeObject(bytes, 
split.getClass().getClassLoader());
-        Assert.assertEquals(5, split.getSplitNumber());
-        Assert.assertArrayEquals(new String[] {"host0"}, split.getHostnames());
+        assertThat(split.getSplitNumber()).isEqualTo(5);
+        assertThat(split.getHostnames()).containsExactly("host0");
         splitChecker.accept(split.getHadoopInputSplit());
     }
 
     @Test
-    public void testFileSplit() throws IOException, ClassNotFoundException {
+    void testFileSplit() throws IOException, ClassNotFoundException {
         FileSplit fileSplit = new FileSplit(new Path("/test"), 0, 100, new 
String[] {"host0"});
         testInner(
                 fileSplit,
-                i -> Assert.assertTrue(i < 10000),
-                split -> Assert.assertEquals(fileSplit, split));
+                i -> assertThat(i).isLessThan(10000),
+                split -> assertThat(split).isEqualTo(fileSplit));
     }
 
     @Test
-    public void testConfigurable() throws IOException, ClassNotFoundException {
+    void testConfigurable() throws IOException, ClassNotFoundException {
         ConfigurableFileSplit fileSplit =
                 new ConfigurableFileSplit(new Path("/test"), 0, 100, new 
String[] {"host0"});
         testInner(
@@ -85,13 +86,13 @@ public class HadoopInputSplitTest {
                 i -> {},
                 inputSplit -> {
                     ConfigurableFileSplit split = (ConfigurableFileSplit) 
inputSplit;
-                    Assert.assertNotNull(split.getConf());
-                    Assert.assertEquals(fileSplit, split);
+                    assertThat(split.getConf()).isNotNull();
+                    assertThat(split).isEqualTo(fileSplit);
                 });
     }
 
     @Test
-    public void testJobConfigurable() throws IOException, 
ClassNotFoundException {
+    void testJobConfigurable() throws IOException, ClassNotFoundException {
         JobConfigurableFileSplit fileSplit =
                 new JobConfigurableFileSplit(new Path("/test"), 0, 100, new 
String[] {"host0"});
         testInner(
@@ -99,8 +100,8 @@ public class HadoopInputSplitTest {
                 i -> {},
                 inputSplit -> {
                     JobConfigurableFileSplit split = 
(JobConfigurableFileSplit) inputSplit;
-                    Assert.assertNotNull(split.getConf());
-                    Assert.assertEquals(fileSplit, split);
+                    assertThat(split.getConf()).isNotNull();
+                    assertThat(split).isEqualTo(fileSplit);
                 });
     }
 
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
index 387783f3d1e..d2e21d23712 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
@@ -33,16 +33,13 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.List;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
@@ -51,12 +48,10 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for {@link HadoopInputFormat}. */
-public class HadoopInputFormatTest {
-
-    @Rule public final ExpectedException exception = ExpectedException.none();
+class HadoopInputFormatTest {
 
     @Test
-    public void testConfigure() throws Exception {
+    void testConfigure() throws Exception {
 
         ConfigurableDummyInputFormat inputFormat = 
mock(ConfigurableDummyInputFormat.class);
 
@@ -68,7 +63,7 @@ public class HadoopInputFormatTest {
     }
 
     @Test
-    public void testCreateInputSplits() throws Exception {
+    void testCreateInputSplits() throws Exception {
         DummyInputFormat inputFormat = mock(DummyInputFormat.class);
 
         HadoopInputFormat<String, Long> hadoopInputFormat =
@@ -79,7 +74,7 @@ public class HadoopInputFormatTest {
     }
 
     @Test
-    public void testOpen() throws Exception {
+    void testOpen() throws Exception {
         DummyInputFormat inputFormat = mock(DummyInputFormat.class);
         when(inputFormat.createRecordReader(
                         nullable(InputSplit.class), 
any(TaskAttemptContext.class)))
@@ -92,11 +87,11 @@ public class HadoopInputFormatTest {
 
         verify(inputFormat, times(1))
                 .createRecordReader(nullable(InputSplit.class), 
any(TaskAttemptContext.class));
-        assertThat(hadoopInputFormat.fetched, is(false));
+        assertThat(hadoopInputFormat.fetched).isFalse();
     }
 
     @Test
-    public void testClose() throws Exception {
+    void testClose() throws Exception {
 
         DummyRecordReader recordReader = mock(DummyRecordReader.class);
 
@@ -108,7 +103,7 @@ public class HadoopInputFormatTest {
     }
 
     @Test
-    public void testCloseWithoutOpen() throws Exception {
+    void testCloseWithoutOpen() throws Exception {
         HadoopInputFormat<String, Long> hadoopInputFormat =
                 new HadoopInputFormat<>(
                         new DummyInputFormat(), String.class, Long.class, 
Job.getInstance());
@@ -116,19 +111,19 @@ public class HadoopInputFormatTest {
     }
 
     @Test
-    public void testFetchNextInitialState() throws Exception {
+    void testFetchNextInitialState() throws Exception {
         DummyRecordReader recordReader = new DummyRecordReader();
 
         HadoopInputFormat<String, Long> hadoopInputFormat =
                 setupHadoopInputFormat(new DummyInputFormat(), 
Job.getInstance(), recordReader);
         hadoopInputFormat.fetchNext();
 
-        assertThat(hadoopInputFormat.fetched, is(true));
-        assertThat(hadoopInputFormat.hasNext, is(false));
+        assertThat(hadoopInputFormat.fetched).isTrue();
+        assertThat(hadoopInputFormat.hasNext).isFalse();
     }
 
     @Test
-    public void testFetchNextRecordReaderHasNewValue() throws Exception {
+    void testFetchNextRecordReaderHasNewValue() throws Exception {
 
         DummyRecordReader recordReader = mock(DummyRecordReader.class);
         when(recordReader.nextKeyValue()).thenReturn(true);
@@ -137,12 +132,12 @@ public class HadoopInputFormatTest {
                 setupHadoopInputFormat(new DummyInputFormat(), 
Job.getInstance(), recordReader);
         hadoopInputFormat.fetchNext();
 
-        assertThat(hadoopInputFormat.fetched, is(true));
-        assertThat(hadoopInputFormat.hasNext, is(true));
+        assertThat(hadoopInputFormat.fetched).isTrue();
+        assertThat(hadoopInputFormat.hasNext).isTrue();
     }
 
     @Test
-    public void testFetchNextRecordReaderThrowsException() throws Exception {
+    void testFetchNextRecordReaderThrowsException() throws Exception {
 
         DummyRecordReader recordReader = mock(DummyRecordReader.class);
         when(recordReader.nextKeyValue()).thenThrow(new 
InterruptedException());
@@ -150,14 +145,16 @@ public class HadoopInputFormatTest {
         HadoopInputFormat<String, Long> hadoopInputFormat =
                 setupHadoopInputFormat(new DummyInputFormat(), 
Job.getInstance(), recordReader);
 
-        exception.expect(IOException.class);
-        hadoopInputFormat.fetchNext();
+        assertThatThrownBy(hadoopInputFormat::fetchNext)
+                .isInstanceOf(IOException.class)
+                .hasCauseInstanceOf(InterruptedException.class);
 
-        assertThat(hadoopInputFormat.hasNext, is(true));
+        assertThat(hadoopInputFormat.hasNext).isFalse();
+        assertThat(hadoopInputFormat.fetched).isTrue();
     }
 
     @Test
-    public void checkTypeInformation() throws Exception {
+    void checkTypeInformation() throws Exception {
 
         HadoopInputFormat<Void, Long> hadoopInputFormat =
                 new HadoopInputFormat<>(
@@ -170,8 +167,8 @@ public class HadoopInputFormatTest {
         TypeInformation<Tuple2<Void, Long>> expectedType =
                 new TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
 
-        assertThat(tupleType.isTupleType(), is(true));
-        assertThat(tupleType, is(equalTo(expectedType)));
+        assertThat(tupleType.isTupleType()).isTrue();
+        assertThat(tupleType).isEqualTo(expectedType);
     }
 
     private HadoopInputFormat<String, Long> setupHadoopInputFormat(
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
index 7a9a1cd8716..5a81e8fbe56 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
@@ -28,13 +28,13 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -42,13 +42,13 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for {@link HadoopOutputFormat}. */
-public class HadoopOutputFormatTest {
+class HadoopOutputFormatTest {
 
     private static final String MAPRED_OUTPUT_PATH = "an/ignored/file/";
     private static final String MAPRED_OUTPUT_DIR_KEY = "mapred.output.dir";
 
     @Test
-    public void testWriteRecord() throws Exception {
+    void testWriteRecord() throws Exception {
 
         RecordWriter<String, Long> recordWriter = 
mock(DummyRecordWriter.class);
         HadoopOutputFormat<String, Long> hadoopOutputFormat =
@@ -65,7 +65,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testOpen() throws Exception {
+    void testOpen() throws Exception {
 
         OutputFormat<String, Long> dummyOutputFormat = 
mock(DummyOutputFormat.class);
         OutputCommitter outputCommitter = setupOutputCommitter(true);
@@ -88,7 +88,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testCloseWithNeedsTaskCommitTrue() throws Exception {
+    void testCloseWithNeedsTaskCommitTrue() throws Exception {
 
         RecordWriter<String, Long> recordWriter = 
Mockito.mock(DummyRecordWriter.class);
         OutputCommitter outputCommitter = setupOutputCommitter(true);
@@ -108,7 +108,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testCloseWithNeedsTaskCommitFalse() throws Exception {
+    void testCloseWithNeedsTaskCommitFalse() throws Exception {
 
         RecordWriter<String, Long> recordWriter = 
Mockito.mock(DummyRecordWriter.class);
         OutputCommitter outputCommitter = setupOutputCommitter(false);
@@ -128,7 +128,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testConfigure() throws Exception {
+    void testConfigure() throws Exception {
 
         ConfigurableDummyOutputFormat outputFormat = 
mock(ConfigurableDummyOutputFormat.class);
 
@@ -142,7 +142,7 @@ public class HadoopOutputFormatTest {
     }
 
     @Test
-    public void testFinalizedGlobal() throws Exception {
+    void testFinalizedGlobal() throws Exception {
 
         HadoopOutputFormat<String, Long> hadoopOutputFormat =
                 setupHadoopOutputFormat(
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
index f2e52347eea..27743ebd32f 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
@@ -25,55 +25,53 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparator;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Fail.fail;
 
 /** Tests for the type extraction of {@link Writable}. */
-@SuppressWarnings("serial")
-public class WritableExtractionTest {
+class WritableExtractionTest {
 
     @Test
-    public void testDetectWritable() {
+    void testDetectWritable() {
         // writable interface itself must not be writable
-        assertFalse(TypeExtractor.isHadoopWritable(Writable.class));
+        assertThat(TypeExtractor.isHadoopWritable(Writable.class)).isFalse();
 
         // various forms of extension
-        assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class));
-        
assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class));
-        
assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class));
+        
assertThat(TypeExtractor.isHadoopWritable(DirectWritable.class)).isTrue();
+        
assertThat(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class)).isTrue();
+        
assertThat(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class)).isTrue();
 
         // some non-writables
-        assertFalse(TypeExtractor.isHadoopWritable(String.class));
-        assertFalse(TypeExtractor.isHadoopWritable(List.class));
-        assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class));
+        assertThat(TypeExtractor.isHadoopWritable(String.class)).isFalse();
+        assertThat(TypeExtractor.isHadoopWritable(List.class)).isFalse();
+        
assertThat(TypeExtractor.isHadoopWritable(WritableComparator.class)).isFalse();
     }
 
     @Test
-    public void testCreateWritableInfo() {
+    void testCreateWritableInfo() {
         TypeInformation<DirectWritable> info1 =
                 
TypeExtractor.createHadoopWritableTypeInfo(DirectWritable.class);
-        assertEquals(DirectWritable.class, info1.getTypeClass());
+        assertThat(info1.getTypeClass()).isEqualTo(DirectWritable.class);
 
         TypeInformation<ViaInterfaceExtension> info2 =
                 
TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class);
-        assertEquals(ViaInterfaceExtension.class, info2.getTypeClass());
+        
assertThat(info2.getTypeClass()).isEqualTo(ViaInterfaceExtension.class);
 
         TypeInformation<ViaAbstractClassExtension> info3 =
                 
TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class);
-        assertEquals(ViaAbstractClassExtension.class, info3.getTypeClass());
+        
assertThat(info3.getTypeClass()).isEqualTo(ViaAbstractClassExtension.class);
     }
 
     @Test
-    public void testValidateTypeInfo() {
+    void testValidateTypeInfo() {
         // validate unrelated type info
         TypeExtractor.validateIfWritable(BasicTypeInfo.STRING_TYPE_INFO, 
String.class);
 
@@ -87,26 +85,27 @@ public class WritableExtractionTest {
                 ViaAbstractClassExtension.class);
 
         // incorrect case: not writable at all
-        try {
-            TypeExtractor.validateIfWritable(
-                    new WritableTypeInfo<>(DirectWritable.class), 
String.class);
-            fail("should have failed with an exception");
-        } catch (InvalidTypesException e) {
-            // expected
-        }
+        assertThatThrownBy(
+                        () -> {
+                            TypeExtractor.validateIfWritable(
+                                    new 
WritableTypeInfo<>(DirectWritable.class), String.class);
+                        })
+                .as("should have failed with an exception")
+                .isInstanceOf(InvalidTypesException.class);
 
         // incorrect case: wrong writable
-        try {
-            TypeExtractor.validateIfWritable(
-                    new WritableTypeInfo<>(ViaInterfaceExtension.class), 
DirectWritable.class);
-            fail("should have failed with an exception");
-        } catch (InvalidTypesException e) {
-            // expected
-        }
+        assertThatThrownBy(
+                        () -> {
+                            TypeExtractor.validateIfWritable(
+                                    new 
WritableTypeInfo<>(ViaInterfaceExtension.class),
+                                    DirectWritable.class);
+                        })
+                .as("should have failed with an exception")
+                .isInstanceOf(InvalidTypesException.class);
     }
 
     @Test
-    public void testExtractFromFunction() {
+    void testExtractFromFunction() {
         RichMapFunction<DirectWritable, DirectWritable> function =
                 new RichMapFunction<DirectWritable, DirectWritable>() {
                     @Override
@@ -119,12 +118,12 @@ public class WritableExtractionTest {
                 TypeExtractor.getMapReturnTypes(
                         function, new 
WritableTypeInfo<>(DirectWritable.class));
 
-        assertTrue(outType instanceof WritableTypeInfo);
-        assertEquals(DirectWritable.class, outType.getTypeClass());
+        assertThat(outType).isInstanceOf(WritableTypeInfo.class);
+        assertThat(outType.getTypeClass()).isEqualTo(DirectWritable.class);
     }
 
     @Test
-    public void testExtractAsPartOfPojo() {
+    void testExtractAsPartOfPojo() {
         PojoTypeInfo<PojoWithWritable> pojoInfo =
                 (PojoTypeInfo<PojoWithWritable>) 
TypeExtractor.getForClass(PojoWithWritable.class);
 
@@ -138,17 +137,18 @@ public class WritableExtractionTest {
                     fail("already seen");
                 }
                 foundWritable = true;
-                assertEquals(
-                        new WritableTypeInfo<>(DirectWritable.class), 
field.getTypeInformation());
-                assertEquals(DirectWritable.class, 
field.getTypeInformation().getTypeClass());
+                assertThat(field.getTypeInformation())
+                        .isEqualTo(new 
WritableTypeInfo<>(DirectWritable.class));
+                assertThat(field.getTypeInformation().getTypeClass())
+                        .isEqualTo(DirectWritable.class);
             }
         }
 
-        assertTrue("missed the writable type", foundWritable);
+        assertThat(foundWritable).as("missed the writable type").isTrue();
     }
 
     @Test
-    public void testInputValidationError() {
+    void testInputValidationError() {
 
         RichMapFunction<Writable, String> function =
                 new RichMapFunction<Writable, String>() {
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
index 924300d9269..d2e60be9d2b 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
@@ -23,13 +23,13 @@ import 
org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 /** Tests for the {@link WritableSerializer}. */
-public class WritableSerializerTest {
+class WritableSerializerTest {
 
     @Test
-    public void testStringArrayWritable() {
+    void testStringArrayWritable() {
         StringArrayWritable[] data =
                 new StringArrayWritable[] {
                     new StringArrayWritable(new String[] {}),
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
index 9fcc1dfbe10..c7f15cc2a2b 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -23,7 +23,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.OperatingSystem;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -35,11 +38,9 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.File;
 import java.net.URI;
@@ -47,28 +48,44 @@ import java.util.ArrayList;
 import java.util.Collection;
 
 import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** Integration tests for Hadoop IO formats. */
-@RunWith(Parameterized.class)
-public class HadoopIOFormatsITCase extends JavaProgramTestBaseJUnit4 {
+@ExtendWith(ParameterizedTestExtension.class)
+public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 
     private static final int NUM_PROGRAMS = 2;
 
-    private final int curProgId;
+    @Parameter private int curProgId;
     private String[] resultPath;
     private String[] expectedResult;
     private String sequenceFileInPath;
     private String sequenceFileInPathNull;
 
-    public HadoopIOFormatsITCase(int curProgId) {
-        this.curProgId = curProgId;
+    @BeforeEach
+    void checkOperatingSystem() {
+        // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+        assumeThat(OperatingSystem.isWindows())
+                .as("This test can't run successfully on Windows.")
+                .isFalse();
     }
 
-    @Before
-    public void checkOperatingSystem() {
-        // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-        Assume.assumeTrue(
-                "This test can't run successfully on Windows.", 
!OperatingSystem.isWindows());
+    @Override
+    @TestTemplate
+    public void testJobWithObjectReuse() throws Exception {
+        super.testJobWithoutObjectReuse();
+    }
+
+    @Override
+    @TestTemplate
+    public void testJobWithoutObjectReuse() throws Exception {
+        super.testJobWithoutObjectReuse();
+    }
+
+    @Override
+    @TestTemplate
+    public void testJobCollectionExecution() throws Exception {
+        super.testJobCollectionExecution();
     }
 
     @Override
@@ -143,13 +160,13 @@ public class HadoopIOFormatsITCase extends 
JavaProgramTestBaseJUnit4 {
         }
     }
 
-    @Parameters
-    public static Collection<Object[]> getConfigurations() {
+    @Parameters(name = "curProgId = {0}")
+    public static Collection<Integer> getConfigurations() {
 
-        Collection<Object[]> programIds = new ArrayList<>(NUM_PROGRAMS);
+        Collection<Integer> programIds = new ArrayList<>(NUM_PROGRAMS);
 
         for (int i = 1; i <= NUM_PROGRAMS; i++) {
-            programIds.add(new Object[] {i});
+            programIds.add(i);
         }
 
         return programIds;
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
index dba5c49f536..9875b49e33b 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
@@ -23,7 +23,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
-import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -31,28 +32,21 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.nio.file.Path;
 
 import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
 
 /** IT cases for the {@link HadoopMapFunction}. */
-@RunWith(Parameterized.class)
-public class HadoopMapFunctionITCase extends MultipleProgramsTestBaseJUnit4 {
+@ExtendWith(ParameterizedTestExtension.class)
+class HadoopMapFunctionITCase extends MultipleProgramsTestBase {
 
-    public HadoopMapFunctionITCase(TestExecutionMode mode) {
-        super(mode);
-    }
-
-    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-    @Test
-    public void testNonPassingMapper() throws Exception {
+    @TestTemplate
+    void testNonPassingMapper(@TempDir Path tempFolder) throws Exception {
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
@@ -61,7 +55,7 @@ public class HadoopMapFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
                         new HadoopMapFunction<IntWritable, Text, IntWritable, 
Text>(
                                 new NonPassingMapper()));
 
-        String resultPath = tempFolder.newFile().toURI().toString();
+        String resultPath = tempFolder.toUri().toString();
 
         nonPassingFlatMapDs.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
         env.execute();
@@ -69,8 +63,8 @@ public class HadoopMapFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
         compareResultsByLinesInMemory("\n", resultPath);
     }
 
-    @Test
-    public void testDataDuplicatingMapper() throws Exception {
+    @TestTemplate
+    void testDataDuplicatingMapper(@TempDir Path tempFolder) throws Exception {
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
@@ -79,7 +73,7 @@ public class HadoopMapFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
                         new HadoopMapFunction<IntWritable, Text, IntWritable, 
Text>(
                                 new DuplicatingMapper()));
 
-        String resultPath = tempFolder.newFile().toURI().toString();
+        String resultPath = tempFolder.toUri().toString();
 
         duplicatingFlatMapDs.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
         env.execute();
@@ -131,8 +125,8 @@ public class HadoopMapFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
         compareResultsByLinesInMemory(expected, resultPath);
     }
 
-    @Test
-    public void testConfigurableMapper() throws Exception {
+    @TestTemplate
+    void testConfigurableMapper(@TempDir Path tempFolder) throws Exception {
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         JobConf conf = new JobConf();
@@ -144,7 +138,7 @@ public class HadoopMapFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
                         new HadoopMapFunction<IntWritable, Text, IntWritable, 
Text>(
                                 new ConfigurableMapper(), conf));
 
-        String resultPath = tempFolder.newFile().toURI().toString();
+        String resultPath = tempFolder.toUri().toString();
 
         hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
         env.execute();
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index 268df224da9..0f7e284ba1e 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -20,25 +20,29 @@ package org.apache.flink.test.hadoopcompatibility.mapred;
 
 import 
org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.util.OperatingSystem;
 
-import org.junit.Assume;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** IT cases for mapred. */
-public class HadoopMapredITCase extends JavaProgramTestBaseJUnit4 {
+@ExtendWith(ParameterizedTestExtension.class)
+public class HadoopMapredITCase extends JavaProgramTestBase {
 
     protected String textPath;
     protected String resultPath;
 
-    @Before
+    @BeforeEach
     public void checkOperatingSystem() {
         // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-        Assume.assumeTrue(
-                "This test can't run successfully on Windows.", 
!OperatingSystem.isWindows());
+        assumeThat(OperatingSystem.isWindows())
+                .as("This test can't run successfully on Windows.")
+                .isFalse();
     }
 
     @Override
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
index c3231337275..3d7376c6172 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
@@ -24,7 +24,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
-import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -32,30 +33,23 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.hamcrest.core.IsEqual;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Iterator;
 
 import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** IT case for the {@link HadoopReduceCombineFunction}. */
-@RunWith(Parameterized.class)
-public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
+@ExtendWith(ParameterizedTestExtension.class)
+class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase {
 
-    public HadoopReduceCombineFunctionITCase(TestExecutionMode mode) {
-        super(mode);
-    }
-
-    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-    @Test
-    public void testStandardCountingWithCombiner() throws Exception {
+    @TestTemplate
+    void testStandardCountingWithCombiner(@TempDir Path tempFolder) throws 
Exception {
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         DataSet<Tuple2<IntWritable, IntWritable>> ds =
@@ -68,7 +62,7 @@ public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBaseJ
                                         IntWritable, IntWritable, IntWritable, 
IntWritable>(
                                         new SumReducer(), new SumReducer()));
 
-        String resultPath = tempFolder.newFile().toURI().toString();
+        String resultPath = tempFolder.toUri().toString();
 
         counts.writeAsText(resultPath);
         env.execute();
@@ -78,8 +72,8 @@ public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBaseJ
         compareResultsByLinesInMemory(expected, resultPath);
     }
 
-    @Test
-    public void testUngroupedHadoopReducer() throws Exception {
+    @TestTemplate
+    void testUngroupedHadoopReducer(@TempDir Path tempFolder) throws Exception 
{
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         DataSet<Tuple2<IntWritable, IntWritable>> ds =
@@ -91,7 +85,7 @@ public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBaseJ
                                 IntWritable, IntWritable, IntWritable, 
IntWritable>(
                                 new SumReducer(), new SumReducer()));
 
-        String resultPath = tempFolder.newFile().toURI().toString();
+        String resultPath = tempFolder.toUri().toString();
 
         sum.writeAsText(resultPath);
         env.execute();
@@ -101,10 +95,9 @@ public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBaseJ
         compareResultsByLinesInMemory(expected, resultPath);
     }
 
-    @Test
-    public void testCombiner() throws Exception {
-        org.junit.Assume.assumeThat(
-                mode, new 
IsEqual<TestExecutionMode>(TestExecutionMode.CLUSTER));
+    @TestTemplate
+    void testCombiner(@TempDir Path tempFolder) throws Exception {
+        assumeThat(mode).isEqualTo(TestExecutionMode.CLUSTER);
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         DataSet<Tuple2<IntWritable, IntWritable>> ds =
@@ -117,7 +110,7 @@ public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBaseJ
                                         IntWritable, IntWritable, IntWritable, 
IntWritable>(
                                         new SumReducer(), new 
KeyChangingReducer()));
 
-        String resultPath = tempFolder.newFile().toURI().toString();
+        String resultPath = tempFolder.toUri().toString();
 
         counts.writeAsText(resultPath);
         env.execute();
@@ -127,8 +120,8 @@ public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBaseJ
         compareResultsByLinesInMemory(expected, resultPath);
     }
 
-    @Test
-    public void testConfigurationViaJobConf() throws Exception {
+    @TestTemplate
+    void testConfigurationViaJobConf(@TempDir Path tempFolder) throws 
Exception {
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         JobConf conf = new JobConf();
@@ -144,7 +137,7 @@ public class HadoopReduceCombineFunctionITCase extends 
MultipleProgramsTestBaseJ
                                         IntWritable, Text, IntWritable, 
IntWritable>(
                                         new ConfigurableCntReducer(), conf));
 
-        String resultPath = tempFolder.newFile().toURI().toString();
+        String resultPath = tempFolder.toUri().toString();
 
         hellos.writeAsText(resultPath);
         env.execute();
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
index 390568251ee..4e991f16118 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
@@ -23,7 +23,8 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
-import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -31,29 +32,22 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Iterator;
 
 import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
 
 /** IT cases for the {@link HadoopReduceFunction}. */
-@RunWith(Parameterized.class)
-public class HadoopReduceFunctionITCase extends MultipleProgramsTestBaseJUnit4 
{
+@ExtendWith(ParameterizedTestExtension.class)
+class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
 
-    public HadoopReduceFunctionITCase(TestExecutionMode mode) {
-        super(mode);
-    }
-
-    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-    @Test
-    public void testStandardGrouping() throws Exception {
+    @TestTemplate
+    void testStandardGrouping(@TempDir Path tempFolder) throws Exception {
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         DataSet<Tuple2<IntWritable, Text>> ds =
@@ -66,7 +60,7 @@ public class HadoopReduceFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
                                         IntWritable, Text, IntWritable, 
IntWritable>(
                                         new CommentCntReducer()));
 
-        String resultPath = tempFolder.newFile().toURI().toString();
+        String resultPath = tempFolder.toUri().toString();
 
         commentCnts.writeAsText(resultPath);
         env.execute();
@@ -76,8 +70,8 @@ public class HadoopReduceFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
         compareResultsByLinesInMemory(expected, resultPath);
     }
 
-    @Test
-    public void testUngroupedHadoopReducer() throws Exception {
+    @TestTemplate
+    void testUngroupedHadoopReducer(@TempDir Path tempFolder) throws Exception 
{
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
@@ -87,7 +81,7 @@ public class HadoopReduceFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
                         new HadoopReduceFunction<IntWritable, Text, 
IntWritable, IntWritable>(
                                 new AllCommentCntReducer()));
 
-        String resultPath = tempFolder.newFile().toURI().toString();
+        String resultPath = tempFolder.toUri().toString();
 
         commentCnts.writeAsText(resultPath);
         env.execute();
@@ -97,8 +91,8 @@ public class HadoopReduceFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
         compareResultsByLinesInMemory(expected, resultPath);
     }
 
-    @Test
-    public void testConfigurationViaJobConf() throws Exception {
+    @TestTemplate
+    void testConfigurationViaJobConf(@TempDir Path tempFolder) throws 
Exception {
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         JobConf conf = new JobConf();
@@ -114,7 +108,7 @@ public class HadoopReduceFunctionITCase extends 
MultipleProgramsTestBaseJUnit4 {
                                         IntWritable, Text, IntWritable, 
IntWritable>(
                                         new ConfigurableCntReducer(), conf));
 
-        String resultPath = tempFolder.newFile().toURI().toString();
+        String resultPath = tempFolder.toUri().toString();
 
         helloCnts.writeAsText(resultPath);
         env.execute();
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java
index f5aa573aa39..9ab1b95ff8b 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java
@@ -26,7 +26,8 @@ import 
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.HadoopInputs;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OperatingSystem;
 
@@ -36,22 +37,25 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.junit.Assume;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** Test WordCount with Hadoop input and output "mapred" (legacy) formats. */
-public class WordCountMapredITCase extends JavaProgramTestBaseJUnit4 {
+@ExtendWith(ParameterizedTestExtension.class)
+class WordCountMapredITCase extends JavaProgramTestBase {
 
-    protected String textPath;
-    protected String resultPath;
+    private String textPath;
+    private String resultPath;
 
-    @Before
-    public void checkOperatingSystem() {
+    @BeforeEach
+    void checkOperatingSystem() {
         // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-        Assume.assumeTrue(
-                "This test can't run successfully on Windows.", 
!OperatingSystem.isWindows());
+        assumeThat(OperatingSystem.isWindows())
+                .as("This test can't run successfully on Windows.")
+                .isFalse();
     }
 
     @Override
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
index 43bebd77db9..e54b0acb833 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
@@ -23,17 +23,19 @@ import 
org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 
 import org.apache.hadoop.io.IntWritable;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.NoSuchElementException;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 /** Tests for the {@link HadoopTupleUnwrappingIterator}. */
-public class HadoopTupleUnwrappingIteratorTest {
+class HadoopTupleUnwrappingIteratorTest {
 
     @Test
-    public void testValueIterator() {
+    void testValueIterator() {
 
         HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt =
                 new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(
@@ -56,16 +58,14 @@ public class HadoopTupleUnwrappingIteratorTest {
         int[] expectedValues = new int[] {1, 2, 3, 4, 5, 6, 7, 8};
 
         valIt.set(tList.iterator());
-        Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+        assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
         for (int expectedValue : expectedValues) {
-            Assert.assertTrue(valIt.hasNext());
-            Assert.assertTrue(valIt.hasNext());
-            Assert.assertTrue(valIt.next().get() == expectedValue);
-            Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+            assertThat(valIt.hasNext()).isTrue();
+            assertThat(valIt.next().get()).isEqualTo(expectedValue);
+            assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
         }
-        Assert.assertFalse(valIt.hasNext());
-        Assert.assertFalse(valIt.hasNext());
-        Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+        assertThat(valIt.hasNext()).isFalse();
+        assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
 
         // one value
 
@@ -76,16 +76,14 @@ public class HadoopTupleUnwrappingIteratorTest {
         expectedValues = new int[] {10};
 
         valIt.set(tList.iterator());
-        Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+        assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
         for (int expectedValue : expectedValues) {
-            Assert.assertTrue(valIt.hasNext());
-            Assert.assertTrue(valIt.hasNext());
-            Assert.assertTrue(valIt.next().get() == expectedValue);
-            Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+            assertThat(valIt.hasNext()).isTrue();
+            assertThat(valIt.next().get()).isEqualTo(expectedValue);
+            assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
         }
-        Assert.assertFalse(valIt.hasNext());
-        Assert.assertFalse(valIt.hasNext());
-        Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+        assertThat(valIt.hasNext()).isFalse();
+        assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
 
         // more values
 
@@ -100,17 +98,15 @@ public class HadoopTupleUnwrappingIteratorTest {
         expectedValues = new int[] {10, 4, 7, 9, 21};
 
         valIt.set(tList.iterator());
-        Assert.assertTrue(valIt.hasNext());
-        Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+        assertThat(valIt.hasNext()).isTrue();
+        assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
         for (int expectedValue : expectedValues) {
-            Assert.assertTrue(valIt.hasNext());
-            Assert.assertTrue(valIt.hasNext());
-            Assert.assertTrue(valIt.next().get() == expectedValue);
-            Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+            assertThat(valIt.hasNext()).isTrue();
+            assertThat(valIt.next().get()).isEqualTo(expectedValue);
+            assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
         }
-        Assert.assertFalse(valIt.hasNext());
-        Assert.assertFalse(valIt.hasNext());
-        Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+        assertThat(valIt.hasNext()).isFalse();
+        assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
 
         // no has next calls
 
@@ -125,17 +121,12 @@ public class HadoopTupleUnwrappingIteratorTest {
         expectedValues = new int[] {5, 8, 42, -1, 0};
 
         valIt.set(tList.iterator());
-        Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+        assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
         for (int expectedValue : expectedValues) {
-            Assert.assertTrue(valIt.next().get() == expectedValue);
-        }
-        try {
-            valIt.next();
-            Assert.fail();
-        } catch (NoSuchElementException nsee) {
-            // expected
+            assertThat(valIt.next().get()).isEqualTo(expectedValue);
         }
-        Assert.assertFalse(valIt.hasNext());
-        Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+        assertThatThrownBy(() -> 
valIt.next()).isInstanceOf(NoSuchElementException.class);
+        assertThat(valIt.hasNext()).isFalse();
+        assertThat(valIt.getCurrentKey().get()).isEqualTo(expectedKey);
     }
 }
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 9e7231b639a..1eb8cc5c568 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -22,25 +22,26 @@ import 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
 import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.OperatingSystem;
 
-import org.junit.Assume;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
 
 import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** IT cases for both the {@link HadoopInputFormat} and {@link 
HadoopOutputFormat}. */
-public class HadoopInputOutputITCase extends JavaProgramTestBaseJUnit4 {
+class HadoopInputOutputITCase extends JavaProgramTestBase {
 
-    protected String textPath;
-    protected String resultPath;
+    private String textPath;
+    private String resultPath;
 
-    @Before
-    public void checkOperatingSystem() {
+    @BeforeEach
+    void checkOperatingSystem() {
         // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-        Assume.assumeTrue(
-                "This test can't run successfully on Windows.", 
!OperatingSystem.isWindows());
+        assumeThat(OperatingSystem.isWindows())
+                .as("This test can't run successfully on Windows.")
+                .isFalse();
     }
 
     @Override
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java
index 0318a20fada..716c4d16ed0 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.HadoopInputs;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OperatingSystem;
 
@@ -36,22 +36,23 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.junit.Assume;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
 
 import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** Test WordCount with Hadoop input and output "mapreduce" (modern) formats. 
*/
-public class WordCountMapreduceITCase extends JavaProgramTestBaseJUnit4 {
+class WordCountMapreduceITCase extends JavaProgramTestBase {
 
-    protected String textPath;
-    protected String resultPath;
+    private String textPath;
+    private String resultPath;
 
-    @Before
-    public void checkOperatingSystem() {
+    @BeforeEach
+    void checkOperatingSystem() {
         // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-        Assume.assumeTrue(
-                "This test can't run successfully on Windows.", 
!OperatingSystem.isWindows());
+        assumeThat(OperatingSystem.isWindows())
+                .as("This test can't run successfully on Windows.")
+                .isFalse();
     }
 
     @Override
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
 
b/flink-connectors/flink-hadoop-compatibility/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 00000000000..0b74fd4603c
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/ScalaWritableTypeInfoTest.scala
 
b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/ScalaWritableTypeInfoTest.scala
index b54fef4f1a1..a6b4f9e0dcf 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/ScalaWritableTypeInfoTest.scala
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/ScalaWritableTypeInfoTest.scala
@@ -19,19 +19,18 @@ package org.apache.flink.api.hadoopcompatibility.scala
 
 import org.apache.flink.api.java.typeutils.WritableTypeInfo
 import org.apache.flink.api.scala._
-import org.apache.flink.util.TestLogger
 
 import org.apache.hadoop.io.Text
-import org.junit.Assert._
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
 
-class ScalaWritableTypeInfoTest extends TestLogger {
+class ScalaWritableTypeInfoTest {
 
   @Test
   def testWritableTypeInfo = {
     val writableTypeInfo = createTypeInformation[Text]
 
-    assertTrue(writableTypeInfo.isInstanceOf[WritableTypeInfo[Text]])
-    assertEquals(classOf[Text], writableTypeInfo.getTypeClass)
+    assertThat(writableTypeInfo).isInstanceOf(classOf[WritableTypeInfo[Text]])
+    assertThat(classOf[Text]).isEqualTo(writableTypeInfo.getTypeClass)
   }
 }
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala
 
b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala
index 8e0a61e980d..73fab54a942 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala
@@ -21,22 +21,25 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
 import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
 import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.{JavaProgramTestBaseJUnit4, TestBaseUtils}
+import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
 import org.apache.flink.util.OperatingSystem
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextInputFormat, 
TextOutputFormat}
-import org.junit.{Assume, Before}
+import org.assertj.core.api.Assumptions.assumeThat
+import org.junit.jupiter.api.BeforeEach
 
-class WordCountMapredITCase extends JavaProgramTestBaseJUnit4 {
+class WordCountMapredITCase extends JavaProgramTestBase {
   protected var textPath: String = null
   protected var resultPath: String = null
 
-  @Before
+  @BeforeEach
   def checkOperatingSystem() {
     // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-    Assume.assumeTrue("This test can't run successfully on Windows.", 
!OperatingSystem.isWindows)
+    assumeThat(OperatingSystem.isWindows)
+      .as("This test can't run successfully on Windows.")
+      .isFalse()
   }
 
   override protected def preSubmit() {
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala
 
b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala
index 0d64a10079a..f5cdfc52d0b 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
 import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
 import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.{JavaProgramTestBaseJUnit4, TestBaseUtils}
+import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
 import org.apache.flink.util.OperatingSystem
 
 import org.apache.hadoop.fs.Path
@@ -29,16 +29,19 @@ import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, 
TextOutputFormat}
-import org.junit.{Assume, Before}
+import org.assertj.core.api.Assumptions.assumeThat
+import org.junit.jupiter.api.BeforeEach
 
-class WordCountMapreduceITCase extends JavaProgramTestBaseJUnit4 {
+class WordCountMapreduceITCase extends JavaProgramTestBase {
   protected var textPath: String = null
   protected var resultPath: String = null
 
-  @Before
+  @BeforeEach
   def checkOperatingSystem() {
     // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-    Assume.assumeTrue("This test can't run successfully on Windows.", 
!OperatingSystem.isWindows)
+    assumeThat(OperatingSystem.isWindows)
+      .as("This test can't run successfully on Windows.")
+      .isFalse()
   }
 
   override protected def preSubmit() {

Reply via email to