This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 97f1213b6c0 [FLINK-39229][test] Migrate tests of flink-protobuf to 
JUnit5
97f1213b6c0 is described below

commit 97f1213b6c03cf88cd3d87f3dcc3a5bdde6c9d07
Author: Zhanghao Chen <[email protected]>
AuthorDate: Sat Mar 21 01:19:44 2026 +0800

    [FLINK-39229][test] Migrate tests of flink-protobuf to JUnit5
---
 .../formats/protobuf/BigPbProtoToRowTest.java      |  89 ++++++++--------
 .../formats/protobuf/BigPbRowToProtoTest.java      | 109 ++++++++++---------
 .../flink/formats/protobuf/MapProtoToRowTest.java  |  27 +++--
 .../flink/formats/protobuf/MapRowToProtoTest.java  |  23 +++--
 .../protobuf/MetaNoMultiProtoToRowTest.java        |   8 +-
 .../flink/formats/protobuf/MetaOuterMultiTest.java |   6 +-
 .../formats/protobuf/MetaOuterNoMultiTest.java     |   6 +-
 .../protobuf/MultiLevelMessageProtoToRowTest.java  |  17 ++-
 .../protobuf/MultiLevelMessageRowToProtoTest.java  |  21 ++--
 .../protobuf/NoJavaPackageProtoToRowTest.java      |   6 +-
 .../formats/protobuf/NullValueToProtoTest.java     | 115 ++++++++++-----------
 .../formats/protobuf/OneofProtoToRowTest.java      |  13 ++-
 .../formats/protobuf/OneofRowToProtoTest.java      |  13 ++-
 .../flink/formats/protobuf/Pb3ToRowTest.java       |  86 ++++++++-------
 .../flink/formats/protobuf/ProtobufSQLITCase.java  |  73 +++++++------
 .../protobuf/RepeatedMessageProtoToRowTest.java    |  16 +--
 .../protobuf/RepeatedMessageRowToProtoTest.java    |  18 ++--
 .../formats/protobuf/RepeatedProtoToRowTest.java   |  18 ++--
 .../formats/protobuf/RepeatedRowToProtoTest.java   |  24 ++---
 .../protobuf/SameOuterClassNameProtoToRowTest.java |  16 +--
 .../protobuf/SameOuterClassNameRowToProtoTest.java |  18 ++--
 .../formats/protobuf/SimpleProtoToRowTest.java     |  86 ++++++++-------
 .../formats/protobuf/SimpleRowToProtoTest.java     |  48 +++++----
 .../protobuf/TimestampMultiProtoToRowTest.java     |  12 +--
 .../protobuf/TimestampMultiRowToProtoTest.java     |  12 +--
 .../protobuf/TimestampNoMultiProtoToRowTest.java   |  12 +--
 .../protobuf/TimestampNoMultiRowToProtoTest.java   |  12 +--
 .../TimestampOuterMultiProtoToRowTest.java         |  12 +--
 .../TimestampOuterMultiRowToProtoTest.java         |  12 +--
 .../TimestampOuterNoMultiProtoToRowTest.java       |  12 +--
 .../TimestampOuterNoMultiRowToProtoTest.java       |  12 +--
 .../protobuf/VeryBigPbProtoToRowITCase.java        |   6 +-
 .../protobuf/VeryBigPbRowToProtoITCase.java        |   6 +-
 33 files changed, 471 insertions(+), 493 deletions(-)

diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java
index ad0ee2efae3..ba8d1bd4bfc 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java
@@ -26,21 +26,18 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
 import com.google.protobuf.ByteString;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test for huge proto definition, which may trigger some special 
optimizations such as code
  * splitting.
  */
-public class BigPbProtoToRowTest {
+class BigPbProtoToRowTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         BigPbClass.BigPbMessage bigPbMessage =
                 BigPbClass.BigPbMessage.newBuilder()
                         .setIntField1(5)
@@ -86,47 +83,47 @@ public class BigPbProtoToRowTest {
                 ProtobufTestHelper.pbBytesToRow(
                         BigPbClass.BigPbMessage.class, 
bigPbMessage.toByteArray());
 
-        assertEquals(5, row.getInt(0));
-        assertFalse(row.getBoolean(1));
-        assertEquals("test1", row.getString(2).toString());
-        assertArrayEquals(new byte[] {1, 2, 3}, row.getBinary(3));
-        assertEquals(2.5, row.getDouble(4), 0.0);
-        assertEquals(1.5F, row.getFloat(5), 0.0);
-        assertEquals(3, row.getInt(6));
-        assertEquals(7L, row.getLong(7));
-        assertEquals(9L, row.getLong(8));
-        assertArrayEquals(new byte[] {4, 5, 6}, row.getBinary(9));
-        assertEquals(6.5, row.getDouble(10), 0.0);
-        assertArrayEquals(new byte[] {7, 8, 9}, row.getBinary(11));
-        assertTrue(row.getBoolean(12));
-        assertEquals("test2", row.getString(13).toString());
-        assertEquals(3.5F, row.getFloat(14), 0.0);
-        assertEquals(8, row.getInt(15));
-        assertArrayEquals(new byte[] {10, 11, 12}, row.getBinary(16));
-        assertTrue(row.getBoolean(17));
-        assertEquals("test3", row.getString(18).toString());
-        assertEquals(4.5F, row.getFloat(19), 0.0);
-        assertEquals(1, row.getInt(20));
-        assertEquals(2L, row.getLong(21));
-        assertEquals(3, row.getInt(22));
-        assertEquals(4L, row.getLong(23));
-        assertEquals(5.5, row.getDouble(24), 0.0);
-        assertEquals(6, row.getInt(25));
-        assertEquals(7L, row.getLong(26));
-        assertTrue(row.getBoolean(27));
-        assertEquals("value1", row.getArray(28).getString(0).toString());
-        assertEquals("value2", row.getArray(28).getString(1).toString());
-        assertEquals("value3", row.getArray(28).getString(2).toString());
-        assertEquals(8.5F, row.getFloat(29), 0.0);
-        assertEquals("test4", row.getString(30).toString());
-        assertArrayEquals(new byte[] {13, 14, 15}, 
row.getMap(31).valueArray().getBinary(0));
-        assertArrayEquals(new byte[] {16, 17, 18}, 
row.getMap(31).valueArray().getBinary(1));
-        assertEquals("value1", 
row.getMap(32).valueArray().getString(0).toString());
-        assertEquals("value2", 
row.getMap(32).valueArray().getString(1).toString());
+        assertThat(row.getInt(0)).isEqualTo(5);
+        assertThat(row.getBoolean(1)).isFalse();
+        assertThat(row.getString(2).toString()).isEqualTo("test1");
+        assertThat(row.getBinary(3)).isEqualTo(new byte[] {1, 2, 3});
+        assertThat(row.getDouble(4)).isEqualTo(2.5);
+        assertThat(row.getFloat(5)).isEqualTo(1.5F);
+        assertThat(row.getInt(6)).isEqualTo(3);
+        assertThat(row.getLong(7)).isEqualTo(7L);
+        assertThat(row.getLong(8)).isEqualTo(9L);
+        assertThat(row.getBinary(9)).isEqualTo(new byte[] {4, 5, 6});
+        assertThat(row.getDouble(10)).isEqualTo(6.5);
+        assertThat(row.getBinary(11)).isEqualTo(new byte[] {7, 8, 9});
+        assertThat(row.getBoolean(12)).isTrue();
+        assertThat(row.getString(13).toString()).isEqualTo("test2");
+        assertThat(row.getFloat(14)).isEqualTo(3.5F);
+        assertThat(row.getInt(15)).isEqualTo(8);
+        assertThat(row.getBinary(16)).isEqualTo(new byte[] {10, 11, 12});
+        assertThat(row.getBoolean(17)).isTrue();
+        assertThat(row.getString(18).toString()).isEqualTo("test3");
+        assertThat(row.getFloat(19)).isEqualTo(4.5F);
+        assertThat(row.getInt(20)).isEqualTo(1);
+        assertThat(row.getLong(21)).isEqualTo(2L);
+        assertThat(row.getInt(22)).isEqualTo(3);
+        assertThat(row.getLong(23)).isEqualTo(4L);
+        assertThat(row.getDouble(24)).isEqualTo(5.5);
+        assertThat(row.getInt(25)).isEqualTo(6);
+        assertThat(row.getLong(26)).isEqualTo(7L);
+        assertThat(row.getBoolean(27)).isTrue();
+        
assertThat(row.getArray(28).getString(0).toString()).isEqualTo("value1");
+        
assertThat(row.getArray(28).getString(1).toString()).isEqualTo("value2");
+        
assertThat(row.getArray(28).getString(2).toString()).isEqualTo("value3");
+        assertThat(row.getFloat(29)).isEqualTo(8.5F);
+        assertThat(row.getString(30).toString()).isEqualTo("test4");
+        assertThat(row.getMap(31).valueArray().getBinary(0)).isEqualTo(new 
byte[] {13, 14, 15});
+        assertThat(row.getMap(31).valueArray().getBinary(1)).isEqualTo(new 
byte[] {16, 17, 18});
+        
assertThat(row.getMap(32).valueArray().getString(0).toString()).isEqualTo("value1");
+        
assertThat(row.getMap(32).valueArray().getString(1).toString()).isEqualTo("value2");
     }
 
     @Test
-    public void testSplitInDeserialization() throws Exception {
+    void testSplitInDeserialization() throws Exception {
         RowType rowType = 
PbToRowTypeUtil.generateRowType(BigPbClass.BigPbMessage.getDescriptor());
         PbFormatConfig formatConfig =
                 new PbFormatConfig(BigPbClass.BigPbMessage.class.getName(), 
false, false, "");
@@ -135,6 +132,6 @@ public class BigPbProtoToRowTest {
                         rowType, InternalTypeInfo.of(rowType), formatConfig);
         pbRowDataDeserializationSchema.open(null);
         // make sure code is split
-        assertTrue(pbRowDataDeserializationSchema.isCodeSplit());
+        assertThat(pbRowDataDeserializationSchema.isCodeSplit()).isTrue();
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java
index 2c9094db384..bc450475234 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java
@@ -28,23 +28,21 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test for huge proto definition, which may trigger some special 
optimizations such as code
  * splitting.
  */
-public class BigPbRowToProtoTest {
+class BigPbRowToProtoTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         GenericRowData rowData = new GenericRowData(33);
         rowData.setField(0, 20);
         rowData.setField(1, false);
@@ -98,62 +96,59 @@ public class BigPbRowToProtoTest {
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(rowData, 
BigPbClass.BigPbMessage.class);
 
         BigPbClass.BigPbMessage bigPbMessage = 
BigPbClass.BigPbMessage.parseFrom(bytes);
-        assertEquals(rowData.getField(0), bigPbMessage.getIntField1());
-        assertEquals(rowData.getField(1), bigPbMessage.getBoolField2());
-        assertEquals(rowData.getField(2).toString(), 
bigPbMessage.getStringField3());
-        assertArrayEquals(
-                ((byte[]) rowData.getField(3)), 
bigPbMessage.getBytesField4().toByteArray());
-        assertEquals(rowData.getField(4), bigPbMessage.getDoubleField5());
-        assertEquals(rowData.getField(5), bigPbMessage.getFloatField6());
-        assertEquals(rowData.getField(6), bigPbMessage.getUint32Field7());
-        assertEquals(rowData.getField(7), bigPbMessage.getInt64Field8());
-        assertEquals(rowData.getField(8), bigPbMessage.getUint64Field9());
-        assertArrayEquals(
-                ((byte[]) rowData.getField(9)), 
bigPbMessage.getBytesField10().toByteArray());
-        assertEquals(rowData.getField(10), bigPbMessage.getDoubleField11());
-        assertArrayEquals(
-                ((byte[]) rowData.getField(11)), 
bigPbMessage.getBytesField12().toByteArray());
-        assertEquals(rowData.getField(12), bigPbMessage.getBoolField13());
-        assertEquals(rowData.getField(13).toString(), 
bigPbMessage.getStringField14());
-        assertEquals(rowData.getField(14), bigPbMessage.getFloatField15());
-        assertEquals(rowData.getField(15), bigPbMessage.getInt32Field16());
-        assertArrayEquals(
-                ((byte[]) rowData.getField(16)), 
bigPbMessage.getBytesField17().toByteArray());
-        assertEquals(rowData.getField(17), bigPbMessage.getBoolField18());
-        assertEquals(rowData.getField(18).toString(), 
bigPbMessage.getStringField19());
-        assertEquals(rowData.getField(19), bigPbMessage.getFloatField20());
-        assertEquals(rowData.getField(20), bigPbMessage.getFixed32Field21());
-        assertEquals(rowData.getField(21), bigPbMessage.getFixed64Field22());
-        assertEquals(rowData.getField(22), bigPbMessage.getSfixed32Field23());
-        assertEquals(rowData.getField(23), bigPbMessage.getSfixed64Field24());
-        assertEquals(rowData.getField(24), bigPbMessage.getDoubleField25());
-        assertEquals(rowData.getField(25), bigPbMessage.getUint32Field26());
-        assertEquals(rowData.getField(26), bigPbMessage.getUint64Field27());
-        assertEquals(rowData.getField(27), bigPbMessage.getBoolField28());
-        assertEquals(
-                ((GenericArrayData) 
rowData.getField(28)).getString(0).toString(),
-                bigPbMessage.getField29List().get(0));
-        assertEquals(
-                ((GenericArrayData) 
rowData.getField(28)).getString(1).toString(),
-                bigPbMessage.getField29List().get(1));
-        assertEquals(
-                ((GenericArrayData) 
rowData.getField(28)).getString(2).toString(),
-                bigPbMessage.getField29List().get(2));
-        assertEquals(rowData.getField(29), bigPbMessage.getFloatField30());
-        assertEquals(rowData.getField(30).toString(), 
bigPbMessage.getStringField31());
+        assertThat(bigPbMessage.getIntField1()).isEqualTo(rowData.getField(0));
+        
assertThat(bigPbMessage.getBoolField2()).isEqualTo(rowData.getField(1));
+        
assertThat(bigPbMessage.getStringField3()).isEqualTo(rowData.getField(2).toString());
+        assertThat(bigPbMessage.getBytesField4().toByteArray())
+                .isEqualTo(((byte[]) rowData.getField(3)));
+        
assertThat(bigPbMessage.getDoubleField5()).isEqualTo(rowData.getField(4));
+        
assertThat(bigPbMessage.getFloatField6()).isEqualTo(rowData.getField(5));
+        
assertThat(bigPbMessage.getUint32Field7()).isEqualTo(rowData.getField(6));
+        
assertThat(bigPbMessage.getInt64Field8()).isEqualTo(rowData.getField(7));
+        
assertThat(bigPbMessage.getUint64Field9()).isEqualTo(rowData.getField(8));
+        assertThat(bigPbMessage.getBytesField10().toByteArray())
+                .isEqualTo(((byte[]) rowData.getField(9)));
+        
assertThat(bigPbMessage.getDoubleField11()).isEqualTo(rowData.getField(10));
+        assertThat(bigPbMessage.getBytesField12().toByteArray())
+                .isEqualTo(((byte[]) rowData.getField(11)));
+        
assertThat(bigPbMessage.getBoolField13()).isEqualTo(rowData.getField(12));
+        
assertThat(bigPbMessage.getStringField14()).isEqualTo(rowData.getField(13).toString());
+        
assertThat(bigPbMessage.getFloatField15()).isEqualTo(rowData.getField(14));
+        
assertThat(bigPbMessage.getInt32Field16()).isEqualTo(rowData.getField(15));
+        assertThat(bigPbMessage.getBytesField17().toByteArray())
+                .isEqualTo(((byte[]) rowData.getField(16)));
+        
assertThat(bigPbMessage.getBoolField18()).isEqualTo(rowData.getField(17));
+        
assertThat(bigPbMessage.getStringField19()).isEqualTo(rowData.getField(18).toString());
+        
assertThat(bigPbMessage.getFloatField20()).isEqualTo(rowData.getField(19));
+        
assertThat(bigPbMessage.getFixed32Field21()).isEqualTo(rowData.getField(20));
+        
assertThat(bigPbMessage.getFixed64Field22()).isEqualTo(rowData.getField(21));
+        
assertThat(bigPbMessage.getSfixed32Field23()).isEqualTo(rowData.getField(22));
+        
assertThat(bigPbMessage.getSfixed64Field24()).isEqualTo(rowData.getField(23));
+        
assertThat(bigPbMessage.getDoubleField25()).isEqualTo(rowData.getField(24));
+        
assertThat(bigPbMessage.getUint32Field26()).isEqualTo(rowData.getField(25));
+        
assertThat(bigPbMessage.getUint64Field27()).isEqualTo(rowData.getField(26));
+        
assertThat(bigPbMessage.getBoolField28()).isEqualTo(rowData.getField(27));
+        assertThat(bigPbMessage.getField29List().get(0))
+                .isEqualTo(((GenericArrayData) 
rowData.getField(28)).getString(0).toString());
+        assertThat(bigPbMessage.getField29List().get(1))
+                .isEqualTo(((GenericArrayData) 
rowData.getField(28)).getString(1).toString());
+        assertThat(bigPbMessage.getField29List().get(2))
+                .isEqualTo(((GenericArrayData) 
rowData.getField(28)).getString(2).toString());
+        
assertThat(bigPbMessage.getFloatField30()).isEqualTo(rowData.getField(29));
+        
assertThat(bigPbMessage.getStringField31()).isEqualTo(rowData.getField(30).toString());
 
         ArrayData keySet = rowData.getMap(32).keyArray();
         ArrayData valueSet = rowData.getMap(32).valueArray();
-        assertEquals(keySet.getString(0).toString(), "key2");
-        assertEquals(keySet.getString(1).toString(), "key3");
-        assertEquals(keySet.getString(2).toString(), "key1");
-        assertEquals(valueSet.getString(0).toString(), "value2");
-        assertEquals(valueSet.getString(1).toString(), "value3");
-        assertEquals(valueSet.getString(2).toString(), "value1");
+        assertThat(keySet.getString(0).toString()).isEqualTo("key2");
+        assertThat(keySet.getString(1).toString()).isEqualTo("key3");
+        assertThat(keySet.getString(2).toString()).isEqualTo("key1");
+        assertThat(valueSet.getString(0).toString()).isEqualTo("value2");
+        assertThat(valueSet.getString(1).toString()).isEqualTo("value3");
+        assertThat(valueSet.getString(2).toString()).isEqualTo("value1");
     }
 
     @Test
-    public void testSplitInSerialization() throws Exception {
+    void testSplitInSerialization() throws Exception {
         RowType rowType = 
PbToRowTypeUtil.generateRowType(BigPbClass.BigPbMessage.getDescriptor());
         PbFormatConfig formatConfig =
                 new PbFormatConfig(BigPbClass.BigPbMessage.class.getName(), 
false, false, "");
@@ -161,6 +156,6 @@ public class BigPbRowToProtoTest {
                 new PbRowDataSerializationSchema(rowType, formatConfig);
         pbRowDataSerializationSchema.open(null);
         // make sure code is split
-        assertTrue(pbRowDataSerializationSchema.isCodeSplit());
+        assertThat(pbRowDataSerializationSchema.isCodeSplit()).isTrue();
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MapProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MapProtoToRowTest.java
index 44c4588d7ab..7952229dab3 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MapProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MapProtoToRowTest.java
@@ -23,15 +23,14 @@ import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 
 import com.google.protobuf.ByteString;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of proto map data to flink internal data. */
-public class MapProtoToRowTest {
+class MapProtoToRowTest {
     @Test
-    public void testMessage() throws Exception {
+    void testMessage() throws Exception {
         MapTest.InnerMessageTest innerMessageTest =
                 MapTest.InnerMessageTest.newBuilder().setA(1).setB(2).build();
         MapTest mapTest =
@@ -45,20 +44,20 @@ public class MapProtoToRowTest {
         RowData row = ProtobufTestHelper.pbBytesToRow(MapTest.class, 
mapTest.toByteArray());
 
         MapData map1 = row.getMap(1);
-        assertEquals("a", map1.keyArray().getString(0).toString());
-        assertEquals("b", map1.valueArray().getString(0).toString());
-        assertEquals("c", map1.keyArray().getString(1).toString());
-        assertEquals("d", map1.valueArray().getString(1).toString());
+        assertThat(map1.keyArray().getString(0).toString()).isEqualTo("a");
+        assertThat(map1.valueArray().getString(0).toString()).isEqualTo("b");
+        assertThat(map1.keyArray().getString(1).toString()).isEqualTo("c");
+        assertThat(map1.valueArray().getString(1).toString()).isEqualTo("d");
 
         MapData map2 = row.getMap(2);
-        assertEquals("f", map2.keyArray().getString(0).toString());
+        assertThat(map2.keyArray().getString(0).toString()).isEqualTo("f");
         RowData rowData2 = map2.valueArray().getRow(0, 2);
 
-        assertEquals(1, rowData2.getInt(0));
-        assertEquals(2L, rowData2.getLong(1));
+        assertThat(rowData2.getInt(0)).isEqualTo(1);
+        assertThat(rowData2.getLong(1)).isEqualTo(2L);
 
         MapData map3 = row.getMap(3);
-        assertEquals("e", map3.keyArray().getString(0).toString());
-        assertArrayEquals(new byte[] {1, 2, 3}, 
map3.valueArray().getBinary(0));
+        assertThat(map3.keyArray().getString(0).toString()).isEqualTo("e");
+        assertThat(map3.valueArray().getBinary(0)).isEqualTo(new byte[] {1, 2, 
3});
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MapRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MapRowToProtoTest.java
index ec120adc828..c2325add42b 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MapRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MapRowToProtoTest.java
@@ -25,17 +25,17 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 
 import com.google.protobuf.ByteString;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of flink internal map data to proto data. */
-public class MapRowToProtoTest {
+class MapRowToProtoTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         Map<StringData, StringData> map1 = new HashMap<>();
         map1.put(StringData.fromString("a"), StringData.fromString("b"));
         Map<StringData, RowData> map2 = new HashMap<>();
@@ -52,20 +52,21 @@ public class MapRowToProtoTest {
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, MapTest.class);
 
         MapTest mapTest = MapTest.parseFrom(bytes);
-        assertEquals(1, mapTest.getA());
-        assertEquals("b", mapTest.getMap1Map().get("a"));
+        assertThat(mapTest.getA()).isEqualTo(1);
+        assertThat(mapTest.getMap1Map().get("a")).isEqualTo("b");
         MapTest.InnerMessageTest innerMessageTest = 
mapTest.getMap2Map().get("c");
-        assertEquals(1, innerMessageTest.getA());
-        assertEquals(2L, innerMessageTest.getB());
-        assertEquals(ByteString.copyFrom(new byte[] {1, 2, 3}), 
mapTest.getMap3Map().get("e"));
+        assertThat(innerMessageTest.getA()).isEqualTo(1);
+        assertThat(innerMessageTest.getB()).isEqualTo(2L);
+        assertThat(mapTest.getMap3Map().get("e"))
+                .isEqualTo(ByteString.copyFrom(new byte[] {1, 2, 3}));
     }
 
     @Test
-    public void testNull() throws Exception {
+    void testNull() throws Exception {
         RowData row = GenericRowData.of(1, null, null, null);
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, MapTest.class);
         MapTest mapTest = MapTest.parseFrom(bytes);
         Map<String, String> map = mapTest.getMap1Map();
-        assertEquals(0, map.size());
+        assertThat(map.size()).isEqualTo(0);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java
index bb83169d96c..2b5a3a5df40 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaNoMultiProtoToRowTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 /**
  * Test class for below case
@@ -39,9 +39,9 @@ import org.junit.Test;
  *
  * <p>It is valid proto definition.
  */
-public class MetaNoMultiProtoToRowTest {
+class MetaNoMultiProtoToRowTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowType rowType =
                 PbToRowTypeUtil.generateRowType(
                         TestSimpleNomulti.SimpleTestNoMulti.getDescriptor());
@@ -56,7 +56,7 @@ public class MetaNoMultiProtoToRowTest {
     }
 
     @Test
-    public void testOuterClassName() throws Exception {
+    void testOuterClassName() throws Exception {
         RowType rowType =
                 PbToRowTypeUtil.generateRowType(
                         TestSimpleNomulti.SimpleTestNoMulti.getDescriptor());
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterMultiTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterMultiTest.java
index 932964804e2..a78c6df5584 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterMultiTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterMultiTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 /**
  * Test class for below case
@@ -41,9 +41,9 @@ import org.junit.Test;
  *
  * <p>It is valid proto definition.
  */
-public class MetaOuterMultiTest {
+class MetaOuterMultiTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowType rowType = 
PbToRowTypeUtil.generateRowType(SimpleTestOuterMulti.getDescriptor());
         PbFormatConfig formatConfig =
                 new PbFormatConfig(SimpleTestOuterMulti.class.getName(), 
false, false, "");
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java
index afcd3498bb0..e8f70f2a215 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MetaOuterNoMultiTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 /**
  * Test class for below case
@@ -40,9 +40,9 @@ import org.junit.Test;
  *
  * <p>It is valid proto definition.
  */
-public class MetaOuterNoMultiTest {
+class MetaOuterNoMultiTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowType rowType =
                 PbToRowTypeUtil.generateRowType(
                         
SimpleTestOuterNomultiProto.SimpleTestOuterNoMulti.getDescriptor());
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MultiLevelMessageProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MultiLevelMessageProtoToRowTest.java
index bea0c3be378..23bf866f736 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MultiLevelMessageProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MultiLevelMessageProtoToRowTest.java
@@ -21,15 +21,14 @@ package org.apache.flink.formats.protobuf;
 import org.apache.flink.formats.protobuf.testproto.MultipleLevelMessageTest;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of multiple level of proto nested message data to flink 
internal data. */
-public class MultiLevelMessageProtoToRowTest {
+class MultiLevelMessageProtoToRowTest {
     @Test
-    public void testMessage() throws Exception {
+    void testMessage() throws Exception {
         MultipleLevelMessageTest.InnerMessageTest1.InnerMessageTest2 
innerMessageTest2 =
                 
MultipleLevelMessageTest.InnerMessageTest1.InnerMessageTest2.newBuilder()
                         .setA(1)
@@ -47,12 +46,12 @@ public class MultiLevelMessageProtoToRowTest {
                 ProtobufTestHelper.pbBytesToRow(
                         MultipleLevelMessageTest.class, 
multipleLevelMessageTest.toByteArray());
 
-        assertEquals(4, row.getArity());
+        assertThat(row.getArity()).isEqualTo(4);
         RowData subRow = (RowData) row.getRow(3, 2);
-        assertFalse(subRow.getBoolean(1));
+        assertThat(subRow.getBoolean(1)).isFalse();
 
         RowData subSubRow = (RowData) subRow.getRow(0, 2);
-        assertEquals(1, subSubRow.getInt(0));
-        assertEquals(2L, subSubRow.getLong(1));
+        assertThat(subSubRow.getInt(0)).isEqualTo(1);
+        assertThat(subSubRow.getLong(1)).isEqualTo(2L);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MultiLevelMessageRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MultiLevelMessageRowToProtoTest.java
index 97c672f1a58..b5f9f36627e 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MultiLevelMessageRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/MultiLevelMessageRowToProtoTest.java
@@ -22,15 +22,14 @@ import 
org.apache.flink.formats.protobuf.testproto.MultipleLevelMessageTest;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of flink internal nested row data to proto data. */
-public class MultiLevelMessageRowToProtoTest {
+class MultiLevelMessageRowToProtoTest {
     @Test
-    public void testMultipleLevelMessage() throws Exception {
+    void testMultipleLevelMessage() throws Exception {
         RowData subSubRow = GenericRowData.of(1, 2L);
         RowData subRow = GenericRowData.of(subSubRow, false);
         RowData row = GenericRowData.of(1, 2L, false, subRow);
@@ -38,14 +37,14 @@ public class MultiLevelMessageRowToProtoTest {
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
MultipleLevelMessageTest.class);
         MultipleLevelMessageTest test = 
MultipleLevelMessageTest.parseFrom(bytes);
 
-        assertFalse(test.getD().getC());
-        assertEquals(1, test.getD().getA().getA());
-        assertEquals(2L, test.getD().getA().getB());
-        assertEquals(1, test.getA());
+        assertThat(test.getD().getC()).isFalse();
+        assertThat(test.getD().getA().getA()).isEqualTo(1);
+        assertThat(test.getD().getA().getB()).isEqualTo(2L);
+        assertThat(test.getA()).isEqualTo(1);
     }
 
     @Test
-    public void testNull() throws Exception {
+    void testNull() throws Exception {
         RowData row = GenericRowData.of(1, 2L, false, null);
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
MultipleLevelMessageTest.class);
 
@@ -53,6 +52,6 @@ public class MultiLevelMessageRowToProtoTest {
 
         MultipleLevelMessageTest.InnerMessageTest1 empty =
                 
MultipleLevelMessageTest.InnerMessageTest1.newBuilder().build();
-        assertEquals(empty, test.getD());
+        assertThat(test.getD()).isEqualTo(empty);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/NoJavaPackageProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/NoJavaPackageProtoToRowTest.java
index b5499b82103..935d92eb213 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/NoJavaPackageProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/NoJavaPackageProtoToRowTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.formats.protobuf;
 
 import org.apache.flink.formats.protobuf.proto.SimpleTestNoJavaPackage;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 /** test no java_package. */
-public class NoJavaPackageProtoToRowTest {
+class NoJavaPackageProtoToRowTest {
     @Test
-    public void testMessage() throws Exception {
+    void testMessage() throws Exception {
         SimpleTestNoJavaPackage simple = 
SimpleTestNoJavaPackage.newBuilder().build();
         ProtobufTestHelper.pbBytesToRow(SimpleTestNoJavaPackage.class, 
simple.toByteArray());
     }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/NullValueToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/NullValueToProtoTest.java
index bb9ef49e579..f0493f8bd9a 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/NullValueToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/NullValueToProtoTest.java
@@ -26,19 +26,18 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 
 import com.google.protobuf.ByteString;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.formats.protobuf.ProtobufTestHelper.mapOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test conversion of null values from flink internal data to proto data. 
Proto data does not permit
  * null values in array/map data.
  */
-public class NullValueToProtoTest {
+class NullValueToProtoTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowData row =
                 GenericRowData.of(
                         // string
@@ -90,80 +89,78 @@ public class NullValueToProtoTest {
                         false);
         NullTest nullTest = NullTest.parseFrom(bytes);
         // string map
-        assertEquals(2, nullTest.getStringMapCount());
-        assertTrue(nullTest.getStringMapMap().containsKey(""));
-        assertTrue(nullTest.getStringMapMap().containsKey("key"));
-        assertEquals("value", nullTest.getStringMapMap().get(""));
-        assertEquals("", nullTest.getStringMapMap().get("key"));
+        assertThat(nullTest.getStringMapCount()).isEqualTo(2);
+        assertThat(nullTest.getStringMapMap().containsKey("")).isTrue();
+        assertThat(nullTest.getStringMapMap().containsKey("key")).isTrue();
+        assertThat(nullTest.getStringMapMap().get("")).isEqualTo("value");
+        assertThat(nullTest.getStringMapMap().get("key")).isEqualTo("");
         // int32 map
-        assertEquals(2, nullTest.getIntMapCount());
-        assertTrue(nullTest.getIntMapMap().containsKey(0));
-        assertTrue(nullTest.getIntMapMap().containsKey(1));
-        assertEquals(Integer.valueOf(1), nullTest.getIntMapMap().get(0));
-        assertEquals(Integer.valueOf(0), nullTest.getIntMapMap().get(1));
+        assertThat(nullTest.getIntMapCount()).isEqualTo(2);
+        assertThat(nullTest.getIntMapMap().containsKey(0)).isTrue();
+        assertThat(nullTest.getIntMapMap().containsKey(1)).isTrue();
+        
assertThat(nullTest.getIntMapMap().get(0)).isEqualTo(Integer.valueOf(1));
+        
assertThat(nullTest.getIntMapMap().get(1)).isEqualTo(Integer.valueOf(0));
         // int64 map
-        assertEquals(2, nullTest.getIntMapCount());
-        assertTrue(nullTest.getLongMapMap().containsKey(0L));
-        assertTrue(nullTest.getLongMapMap().containsKey(1L));
-        assertEquals(Long.valueOf(1L), nullTest.getLongMapMap().get(0L));
-        assertEquals(Long.valueOf(0L), nullTest.getLongMapMap().get(1L));
+        assertThat(nullTest.getIntMapCount()).isEqualTo(2);
+        assertThat(nullTest.getLongMapMap().containsKey(0L)).isTrue();
+        assertThat(nullTest.getLongMapMap().containsKey(1L)).isTrue();
+        
assertThat(nullTest.getLongMapMap().get(0L)).isEqualTo(Long.valueOf(1L));
+        
assertThat(nullTest.getLongMapMap().get(1L)).isEqualTo(Long.valueOf(0L));
         // bool map
-        assertEquals(2, nullTest.getBooleanMapCount());
-        assertTrue(nullTest.getBooleanMapMap().containsKey(false));
-        assertTrue(nullTest.getBooleanMapMap().containsKey(true));
-        assertEquals(Boolean.TRUE, nullTest.getBooleanMapMap().get(false));
-        assertEquals(Boolean.FALSE, nullTest.getBooleanMapMap().get(true));
+        assertThat(nullTest.getBooleanMapCount()).isEqualTo(2);
+        assertThat(nullTest.getBooleanMapMap().containsKey(false)).isTrue();
+        assertThat(nullTest.getBooleanMapMap().containsKey(true)).isTrue();
+        
assertThat(nullTest.getBooleanMapMap().get(false)).isEqualTo(Boolean.TRUE);
+        
assertThat(nullTest.getBooleanMapMap().get(true)).isEqualTo(Boolean.FALSE);
         // float map
-        assertEquals(1, nullTest.getFloatMapCount());
-        assertEquals(Float.valueOf(0.0f), 
nullTest.getFloatMapMap().get("key"));
+        assertThat(nullTest.getFloatMapCount()).isEqualTo(1);
+        
assertThat(nullTest.getFloatMapMap().get("key")).isEqualTo(Float.valueOf(0.0f));
         // double map
-        assertEquals(1, nullTest.getDoubleMapCount());
-        assertEquals(Double.valueOf(0.0), 
nullTest.getDoubleMapMap().get("key"));
+        assertThat(nullTest.getDoubleMapCount()).isEqualTo(1);
+        
assertThat(nullTest.getDoubleMapMap().get("key")).isEqualTo(Double.valueOf(0.0));
         // enum map
-        assertEquals(1, nullTest.getEnumMapCount());
-        assertEquals(NullTest.Corpus.UNIVERSAL, 
nullTest.getEnumMapMap().get("key"));
+        assertThat(nullTest.getEnumMapCount()).isEqualTo(1);
+        
assertThat(nullTest.getEnumMapMap().get("key")).isEqualTo(NullTest.Corpus.UNIVERSAL);
         // message map
-        assertEquals(1, nullTest.getMessageMapCount());
-        assertEquals(
-                NullTest.InnerMessageTest.getDefaultInstance(),
-                nullTest.getMessageMapMap().get("key"));
+        assertThat(nullTest.getMessageMapCount()).isEqualTo(1);
+        assertThat(nullTest.getMessageMapMap().get("key"))
+                .isEqualTo(NullTest.InnerMessageTest.getDefaultInstance());
         // bytes map
-        assertEquals(1, nullTest.getBytesMapCount());
-        assertEquals(ByteString.EMPTY, nullTest.getBytesMapMap().get("key"));
+        assertThat(nullTest.getBytesMapCount()).isEqualTo(1);
+        
assertThat(nullTest.getBytesMapMap().get("key")).isEqualTo(ByteString.EMPTY);
 
         // string array
-        assertEquals(1, nullTest.getStringArrayCount());
-        assertEquals("", nullTest.getStringArrayList().get(0));
+        assertThat(nullTest.getStringArrayCount()).isEqualTo(1);
+        assertThat(nullTest.getStringArrayList().get(0)).isEqualTo("");
         // int array
-        assertEquals(1, nullTest.getIntArrayCount());
-        assertEquals(Integer.valueOf(0), nullTest.getIntArrayList().get(0));
+        assertThat(nullTest.getIntArrayCount()).isEqualTo(1);
+        
assertThat(nullTest.getIntArrayList().get(0)).isEqualTo(Integer.valueOf(0));
         // long array
-        assertEquals(1, nullTest.getLongArrayCount());
-        assertEquals(Long.valueOf(0L), nullTest.getLongArrayList().get(0));
+        assertThat(nullTest.getLongArrayCount()).isEqualTo(1);
+        
assertThat(nullTest.getLongArrayList().get(0)).isEqualTo(Long.valueOf(0L));
         // float array
-        assertEquals(1, nullTest.getFloatArrayCount());
-        assertEquals(Float.valueOf(0), nullTest.getFloatArrayList().get(0));
+        assertThat(nullTest.getFloatArrayCount()).isEqualTo(1);
+        
assertThat(nullTest.getFloatArrayList().get(0)).isEqualTo(Float.valueOf(0));
         // double array
-        assertEquals(1, nullTest.getDoubleArrayCount());
-        assertEquals(Double.valueOf(0), nullTest.getDoubleArrayList().get(0));
+        assertThat(nullTest.getDoubleArrayCount()).isEqualTo(1);
+        
assertThat(nullTest.getDoubleArrayList().get(0)).isEqualTo(Double.valueOf(0));
         // boolean array
-        assertEquals(1, nullTest.getBooleanArrayCount());
-        assertEquals(Boolean.FALSE, nullTest.getBooleanArrayList().get(0));
+        assertThat(nullTest.getBooleanArrayCount()).isEqualTo(1);
+        
assertThat(nullTest.getBooleanArrayList().get(0)).isEqualTo(Boolean.FALSE);
         // enum array
-        assertEquals(1, nullTest.getEnumArrayCount());
-        assertEquals(NullTest.Corpus.UNIVERSAL, 
nullTest.getEnumArrayList().get(0));
+        assertThat(nullTest.getEnumArrayCount()).isEqualTo(1);
+        
assertThat(nullTest.getEnumArrayList().get(0)).isEqualTo(NullTest.Corpus.UNIVERSAL);
         // message array
-        assertEquals(1, nullTest.getMessageArrayCount());
-        assertEquals(
-                NullTest.InnerMessageTest.getDefaultInstance(),
-                nullTest.getMessageArrayList().get(0));
+        assertThat(nullTest.getMessageArrayCount()).isEqualTo(1);
+        assertThat(nullTest.getMessageArrayList().get(0))
+                .isEqualTo(NullTest.InnerMessageTest.getDefaultInstance());
         // bytes array
-        assertEquals(1, nullTest.getBytesArrayCount());
-        assertEquals(ByteString.EMPTY, nullTest.getBytesArrayList().get(0));
+        assertThat(nullTest.getBytesArrayCount()).isEqualTo(1);
+        
assertThat(nullTest.getBytesArrayList().get(0)).isEqualTo(ByteString.EMPTY);
     }
 
     @Test
-    public void testNullStringLiteral() throws Exception {
+    void testNullStringLiteral() throws Exception {
         RowData row =
                 GenericRowData.of(
                         // string
@@ -214,6 +211,6 @@ public class NullValueToProtoTest {
                         new PbFormatConfig(NullTest.class.getName(), false, 
false, "NULL"),
                         false);
         NullTest nullTest = NullTest.parseFrom(bytes);
-        assertEquals("NULL", nullTest.getStringMapMap().get("key"));
+        assertThat(nullTest.getStringMapMap().get("key")).isEqualTo("NULL");
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/OneofProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/OneofProtoToRowTest.java
index 04d29c42eb8..545c72470cc 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/OneofProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/OneofProtoToRowTest.java
@@ -21,18 +21,17 @@ package org.apache.flink.formats.protobuf;
 import org.apache.flink.formats.protobuf.testproto.OneofTest;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of proto one_of data to flink internal data. */
-public class OneofProtoToRowTest {
+class OneofProtoToRowTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         OneofTest oneofTest = OneofTest.newBuilder().setA(1).setB(2).build();
         RowData row = ProtobufTestHelper.pbBytesToRow(OneofTest.class, 
oneofTest.toByteArray());
-        assertTrue(row.isNullAt(0));
-        assertEquals(2, row.getInt(1));
+        assertThat(row.isNullAt(0)).isTrue();
+        assertThat(row.getInt(1)).isEqualTo(2);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/OneofRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/OneofRowToProtoTest.java
index 3b63e7f695d..f4f3bfcd7d1 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/OneofRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/OneofRowToProtoTest.java
@@ -22,20 +22,19 @@ import 
org.apache.flink.formats.protobuf.testproto.OneofTest;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of flink internal map data to one_of proto data. */
-public class OneofRowToProtoTest {
+class OneofRowToProtoTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowData row = GenericRowData.of(1, 2);
 
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, OneofTest.class);
         OneofTest oneofTest = OneofTest.parseFrom(bytes);
-        assertFalse(oneofTest.hasA());
-        assertEquals(2, oneofTest.getB());
+        assertThat(oneofTest.hasA()).isFalse();
+        assertThat(oneofTest.getB()).isEqualTo(2);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/Pb3ToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/Pb3ToRowTest.java
index 7dea8ccdda4..3fac1de6903 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/Pb3ToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/Pb3ToRowTest.java
@@ -24,19 +24,17 @@ import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 
 import com.google.protobuf.ByteString;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test conversion of proto3 data to flink internal data. Default values after 
conversion is tested
  * especially.
  */
-public class Pb3ToRowTest {
+class Pb3ToRowTest {
     @Test
-    public void testDeserialization() throws Exception {
+    void testDeserialization() throws Exception {
         Pb3Test.InnerMessageTest innerMessageTest =
                 Pb3Test.InnerMessageTest.newBuilder().setA(1).setB(2).build();
         Pb3Test mapTest =
@@ -57,62 +55,62 @@ public class Pb3ToRowTest {
 
         RowData row = ProtobufTestHelper.pbBytesToRow(Pb3Test.class, 
mapTest.toByteArray());
 
-        assertEquals(1, row.getInt(0));
-        assertEquals(2L, row.getLong(1));
-        assertEquals("haha", row.getString(2).toString());
-        assertEquals(Float.valueOf(1.1f), Float.valueOf(row.getFloat(3)));
-        assertEquals(Double.valueOf(1.2), Double.valueOf(row.getDouble(4)));
-        assertEquals("IMAGES", row.getString(5).toString());
+        assertThat(row.getInt(0)).isEqualTo(1);
+        assertThat(row.getLong(1)).isEqualTo(2L);
+        assertThat(row.getString(2).toString()).isEqualTo("haha");
+        assertThat(row.getFloat(3)).isEqualTo(1.1f);
+        assertThat(row.getDouble(4)).isEqualTo(1.2);
+        assertThat(row.getString(5).toString()).isEqualTo("IMAGES");
 
         RowData rowData = row.getRow(6, 2);
-        assertEquals(1, rowData.getInt(0));
-        assertEquals(2L, rowData.getInt(1));
+        assertThat(rowData.getInt(0)).isEqualTo(1);
+        assertThat(rowData.getInt(1)).isEqualTo(2L);
 
         rowData = row.getArray(7).getRow(0, 2);
-        assertEquals(1, rowData.getInt(0));
-        assertEquals(2L, rowData.getInt(1));
+        assertThat(rowData.getInt(0)).isEqualTo(1);
+        assertThat(rowData.getInt(1)).isEqualTo(2L);
 
-        assertEquals(100, row.getBinary(8)[0]);
+        assertThat(row.getBinary(8)[0]).isEqualTo((byte) 100);
 
         MapData map1 = row.getMap(9);
-        assertEquals("a", map1.keyArray().getString(0).toString());
-        assertEquals("b", map1.valueArray().getString(0).toString());
-        assertEquals("c", map1.keyArray().getString(1).toString());
-        assertEquals("d", map1.valueArray().getString(1).toString());
+        assertThat(map1.keyArray().getString(0).toString()).isEqualTo("a");
+        assertThat(map1.valueArray().getString(0).toString()).isEqualTo("b");
+        assertThat(map1.keyArray().getString(1).toString()).isEqualTo("c");
+        assertThat(map1.valueArray().getString(1).toString()).isEqualTo("d");
 
         MapData map2 = row.getMap(10);
-        assertEquals("f", map2.keyArray().getString(0).toString());
+        assertThat(map2.keyArray().getString(0).toString()).isEqualTo("f");
         rowData = map2.valueArray().getRow(0, 2);
 
-        assertEquals(1, rowData.getInt(0));
-        assertEquals(2L, rowData.getLong(1));
+        assertThat(rowData.getInt(0)).isEqualTo(1);
+        assertThat(rowData.getLong(1)).isEqualTo(2L);
     }
 
     @Test
-    public void testReadDefaultValues() throws Exception {
+    void testReadDefaultValues() throws Exception {
         Pb3Test pb3Test = Pb3Test.newBuilder().build();
         RowData row = ProtobufTestHelper.pbBytesToRow(Pb3Test.class, 
pb3Test.toByteArray());
 
         // primitive types should have default values
-        assertFalse(row.isNullAt(0));
-        assertFalse(row.isNullAt(1));
-        assertFalse(row.isNullAt(2));
-        assertFalse(row.isNullAt(3));
-        assertFalse(row.isNullAt(4));
-        assertFalse(row.isNullAt(5));
-        assertFalse(row.isNullAt(8));
-
-        assertEquals(0, row.getInt(0));
-        assertEquals(0L, row.getLong(1));
-        assertEquals("", row.getString(2).toString());
-        assertEquals(Float.valueOf(0.0f), Float.valueOf(row.getFloat(3)));
-        assertEquals(Double.valueOf(0.0d), Double.valueOf(row.getDouble(4)));
-        assertEquals("UNIVERSAL", row.getString(5).toString());
-        assertEquals(0, row.getBinary(8).length);
+        assertThat(row.isNullAt(0)).isFalse();
+        assertThat(row.isNullAt(1)).isFalse();
+        assertThat(row.isNullAt(2)).isFalse();
+        assertThat(row.isNullAt(3)).isFalse();
+        assertThat(row.isNullAt(4)).isFalse();
+        assertThat(row.isNullAt(5)).isFalse();
+        assertThat(row.isNullAt(8)).isFalse();
+
+        assertThat(row.getInt(0)).isEqualTo(0);
+        assertThat(row.getLong(1)).isEqualTo(0L);
+        assertThat(row.getString(2).toString()).isEqualTo("");
+        assertThat(row.getFloat(3)).isEqualTo(0.0f);
+        assertThat(row.getDouble(4)).isEqualTo(0.0d);
+        assertThat(row.getString(5).toString()).isEqualTo("UNIVERSAL");
+        assertThat(row.getBinary(8).length).isEqualTo(0);
         // non-primitive types should be null
-        assertTrue(row.isNullAt(6));
-        assertTrue(row.isNullAt(7));
-        assertTrue(row.isNullAt(9));
-        assertTrue(row.isNullAt(10));
+        assertThat(row.isNullAt(6)).isTrue();
+        assertThat(row.isNullAt(7)).isTrue();
+        assertThat(row.isNullAt(9)).isTrue();
+        assertThat(row.isNullAt(10)).isTrue();
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCase.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCase.java
index 422574115e8..3429e2bffe2 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCase.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCase.java
@@ -33,14 +33,11 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
 
 /** Integration SQL test for protobuf. */
-public class ProtobufSQLITCase extends BatchTestBase {
+class ProtobufSQLITCase extends BatchTestBase {
 
     private MapTest getProtoTestObject() {
         MapTest.InnerMessageTest innerMessageTest =
@@ -56,7 +53,7 @@ public class ProtobufSQLITCase extends BatchTestBase {
     }
 
     @Test
-    public void testSource() {
+    void testSource() {
         TestProtobufTestStore.sourcePbInputs.clear();
         
TestProtobufTestStore.sourcePbInputs.add(getProtoTestObject().toByteArray());
 
@@ -74,18 +71,18 @@ public class ProtobufSQLITCase extends BatchTestBase {
         tEnv().executeSql(sql);
         TableResult result = tEnv().executeSql("select * from bigdata_source");
         Row row = result.collect().next();
-        assertEquals(1, (int) row.getField(0));
+        assertThat((int) row.getField(0)).isEqualTo(1);
         Map<String, String> map1 = (Map<String, String>) row.getField(1);
-        assertEquals("b", map1.get("a"));
-        assertEquals("d", map1.get("c"));
+        assertThat(map1.get("a")).isEqualTo("b");
+        assertThat(map1.get("c")).isEqualTo("d");
         Map<String, Row> map2 = (Map<String, Row>) row.getField(2);
         Row innerRow = map2.get("f");
-        assertEquals(1, innerRow.getField(0));
-        assertEquals(2L, innerRow.getField(1));
+        assertThat(innerRow.getField(0)).isEqualTo(1);
+        assertThat(innerRow.getField(1)).isEqualTo(2L);
     }
 
     @Test
-    public void testSourceNotIgnoreParseError() throws InterruptedException {
+    void testSourceNotIgnoreParseError() throws InterruptedException {
         TestProtobufTestStore.sourcePbInputs.clear();
         // pass an incompatible bytes
         TestProtobufTestStore.sourcePbInputs.add(new byte[] {127, 127, 127, 
127, 127});
@@ -108,11 +105,11 @@ public class ProtobufSQLITCase extends BatchTestBase {
         } catch (Exception ex) {
             return;
         }
-        fail("executeSql should raise exception");
+        assertThat(false).withFailMessage("executeSql should raise 
exception").isTrue();
     }
 
     @Test
-    public void testSourceIgnoreParseError() throws InterruptedException, 
ExecutionException {
+    void testSourceIgnoreParseError() throws InterruptedException, 
ExecutionException {
         TestProtobufTestStore.sourcePbInputs.clear();
         // pass an incompatible bytes
         TestProtobufTestStore.sourcePbInputs.add(new byte[] {127, 127, 127, 
127, 127});
@@ -132,11 +129,11 @@ public class ProtobufSQLITCase extends BatchTestBase {
         tEnv().executeSql(sql);
         TableResult result = tEnv().executeSql("select * from bigdata_source");
         CloseableIterator<Row> iterator = result.collect();
-        assertFalse(iterator.hasNext());
+        assertThat(iterator.hasNext()).isFalse();
     }
 
     @Test
-    public void testSourceWithDefaultValueOfPb2WhenTrue() {
+    void testSourceWithDefaultValueOfPb2WhenTrue() {
         MapTest mapTest = MapTest.newBuilder().build();
 
         TestProtobufTestStore.sourcePbInputs.clear();
@@ -157,11 +154,11 @@ public class ProtobufSQLITCase extends BatchTestBase {
         tEnv().executeSql(sql);
         TableResult result = tEnv().executeSql("select * from bigdata_source");
         Row row = result.collect().next();
-        assertEquals(0, (int) row.getField(0));
+        assertThat((int) row.getField(0)).isEqualTo(0);
     }
 
     @Test
-    public void testSourceWithDefaultValueOfPb2WhenFalse() {
+    void testSourceWithDefaultValueOfPb2WhenFalse() {
         MapTest mapTest = MapTest.newBuilder().build();
 
         TestProtobufTestStore.sourcePbInputs.clear();
@@ -182,11 +179,11 @@ public class ProtobufSQLITCase extends BatchTestBase {
         tEnv().executeSql(sql);
         TableResult result = tEnv().executeSql("select * from bigdata_source");
         Row row = result.collect().next();
-        assertNull(row.getField(0));
+        assertThat(row.getField(0)).isNull();
     }
 
     @Test
-    public void testSourceWithDefaultValueOfPb3WhenTrue() {
+    void testSourceWithDefaultValueOfPb3WhenTrue() {
         Pb3Test pb3Test = Pb3Test.newBuilder().build();
 
         TestProtobufTestStore.sourcePbInputs.clear();
@@ -207,11 +204,11 @@ public class ProtobufSQLITCase extends BatchTestBase {
         tEnv().executeSql(sql);
         TableResult result = tEnv().executeSql("select * from bigdata_source");
         Row row = result.collect().next();
-        assertEquals(0, (int) row.getField(0));
+        assertThat((int) row.getField(0)).isEqualTo(0);
     }
 
     @Test
-    public void testSourceWithDefaultValueOfPb3WhenFalse() {
+    void testSourceWithDefaultValueOfPb3WhenFalse() {
         Pb3Test pb3Test = Pb3Test.newBuilder().build();
 
         TestProtobufTestStore.sourcePbInputs.clear();
@@ -232,11 +229,11 @@ public class ProtobufSQLITCase extends BatchTestBase {
         tEnv().executeSql(sql);
         TableResult result = tEnv().executeSql("select * from bigdata_source");
         Row row = result.collect().next();
-        assertEquals(0, (int) row.getField(0));
+        assertThat((int) row.getField(0)).isEqualTo(0);
     }
 
     @Test
-    public void testSink() throws Exception {
+    void testSink() throws Exception {
         TestProtobufTestStore.sourcePbInputs.clear();
         
TestProtobufTestStore.sourcePbInputs.add(getProtoTestObject().toByteArray());
         TestProtobufTestStore.sinkResults.clear();
@@ -260,16 +257,16 @@ public class ProtobufSQLITCase extends BatchTestBase {
 
         byte[] bytes = TestProtobufTestStore.sinkResults.get(0);
         MapTest mapTest = MapTest.parseFrom(bytes);
-        assertEquals(1, mapTest.getA());
-        assertEquals("b", mapTest.getMap1Map().get("a"));
-        assertEquals("d", mapTest.getMap1Map().get("c"));
+        assertThat(mapTest.getA()).isEqualTo(1);
+        assertThat(mapTest.getMap1Map().get("a")).isEqualTo("b");
+        assertThat(mapTest.getMap1Map().get("c")).isEqualTo("d");
         MapTest.InnerMessageTest innerMessageTest = 
mapTest.getMap2Map().get("f");
-        assertEquals(1, innerMessageTest.getA());
-        assertEquals(2L, innerMessageTest.getB());
+        assertThat(innerMessageTest.getA()).isEqualTo(1);
+        assertThat(innerMessageTest.getB()).isEqualTo(2L);
     }
 
     @Test
-    public void testSinkWithNullLiteral() throws Exception {
+    void testSinkWithNullLiteral() throws Exception {
         TestProtobufTestStore.sourcePbInputs.clear();
         
TestProtobufTestStore.sourcePbInputs.add(getProtoTestObject().toByteArray());
         TestProtobufTestStore.sinkResults.clear();
@@ -294,14 +291,14 @@ public class ProtobufSQLITCase extends BatchTestBase {
 
         byte[] bytes = TestProtobufTestStore.sinkResults.get(0);
         MapTest mapTest = MapTest.parseFrom(bytes);
-        assertEquals(1, mapTest.getA());
-        assertEquals("NULL", mapTest.getMap1Map().get("a"));
+        assertThat(mapTest.getA()).isEqualTo(1);
+        assertThat(mapTest.getMap1Map().get("a")).isEqualTo("NULL");
         MapTest.InnerMessageTest innerMessageTest = 
mapTest.getMap2Map().get("b");
-        assertEquals(MapTest.InnerMessageTest.getDefaultInstance(), 
innerMessageTest);
+        
assertThat(innerMessageTest).isEqualTo(MapTest.InnerMessageTest.getDefaultInstance());
     }
 
     @Test
-    public void testSinkWithNullLiteralWithEscape() throws Exception {
+    void testSinkWithNullLiteralWithEscape() throws Exception {
         TestProtobufTestStore.sourcePbInputs.clear();
         
TestProtobufTestStore.sourcePbInputs.add(getProtoTestObject().toByteArray());
         TestProtobufTestStore.sinkResults.clear();
@@ -326,14 +323,14 @@ public class ProtobufSQLITCase extends BatchTestBase {
 
         byte[] bytes = TestProtobufTestStore.sinkResults.get(0);
         MapTest mapTest = MapTest.parseFrom(bytes);
-        assertEquals(1, mapTest.getA());
-        assertEquals("\"NULL\"", mapTest.getMap1Map().get("a"));
+        assertThat(mapTest.getA()).isEqualTo(1);
+        assertThat(mapTest.getMap1Map().get("a")).isEqualTo("\"NULL\"");
         MapTest.InnerMessageTest innerMessageTest = 
mapTest.getMap2Map().get("b");
-        assertEquals(MapTest.InnerMessageTest.getDefaultInstance(), 
innerMessageTest);
+        
assertThat(innerMessageTest).isEqualTo(MapTest.InnerMessageTest.getDefaultInstance());
     }
 
     @Test
-    public void testUnsupportedBulkFilesystemSink() {
+    void testUnsupportedBulkFilesystemSink() {
         env().setParallelism(1);
         String sql =
                 "create table bigdata_sink ( "
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedMessageProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedMessageProtoToRowTest.java
index 0c2ab740bf4..66664870939 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedMessageProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedMessageProtoToRowTest.java
@@ -22,14 +22,14 @@ import 
org.apache.flink.formats.protobuf.testproto.RepeatedMessageTest;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of proto repeated message data to flink internal data. */
-public class RepeatedMessageProtoToRowTest {
+class RepeatedMessageProtoToRowTest {
     @Test
-    public void testRepeatedMessage() throws Exception {
+    void testRepeatedMessage() throws Exception {
         RepeatedMessageTest.InnerMessageTest innerMessageTest =
                 
RepeatedMessageTest.InnerMessageTest.newBuilder().setA(1).setB(2L).build();
 
@@ -48,10 +48,10 @@ public class RepeatedMessageProtoToRowTest {
 
         ArrayData objs = row.getArray(0);
         RowData subRow = objs.getRow(0, 2);
-        assertEquals(1, subRow.getInt(0));
-        assertEquals(2L, subRow.getLong(1));
+        assertThat(subRow.getInt(0)).isEqualTo(1);
+        assertThat(subRow.getLong(1)).isEqualTo(2L);
         subRow = objs.getRow(1, 2);
-        assertEquals(3, subRow.getInt(0));
-        assertEquals(4L, subRow.getLong(1));
+        assertThat(subRow.getInt(0)).isEqualTo(3);
+        assertThat(subRow.getLong(1)).isEqualTo(4L);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedMessageRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedMessageRowToProtoTest.java
index af18a014591..c7eb0964208 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedMessageRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedMessageRowToProtoTest.java
@@ -24,14 +24,14 @@ import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of flink internal array of row to proto data. */
-public class RepeatedMessageRowToProtoTest {
+class RepeatedMessageRowToProtoTest {
     @Test
-    public void testRepeatedMessage() throws Exception {
+    void testRepeatedMessage() throws Exception {
         RowData subRow = GenericRowData.of(1, 2L);
         RowData subRow2 = GenericRowData.of(3, 4L);
         ArrayData tmp = new GenericArrayData(new Object[] {subRow, subRow2});
@@ -40,11 +40,11 @@ public class RepeatedMessageRowToProtoTest {
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
RepeatedMessageTest.class);
         RepeatedMessageTest repeatedMessageTest = 
RepeatedMessageTest.parseFrom(bytes);
 
-        assertEquals(2, repeatedMessageTest.getDCount());
+        assertThat(repeatedMessageTest.getDCount()).isEqualTo(2);
 
-        assertEquals(1, repeatedMessageTest.getD(0).getA());
-        assertEquals(2L, repeatedMessageTest.getD(0).getB());
-        assertEquals(3, repeatedMessageTest.getD(1).getA());
-        assertEquals(4L, repeatedMessageTest.getD(1).getB());
+        assertThat(repeatedMessageTest.getD(0).getA()).isEqualTo(1);
+        assertThat(repeatedMessageTest.getD(0).getB()).isEqualTo(2L);
+        assertThat(repeatedMessageTest.getD(1).getA()).isEqualTo(3);
+        assertThat(repeatedMessageTest.getD(1).getB()).isEqualTo(4L);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedProtoToRowTest.java
index 8d541db9596..4a84d1b9d59 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedProtoToRowTest.java
@@ -22,22 +22,22 @@ import 
org.apache.flink.formats.protobuf.testproto.RepeatedTest;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of proto list of primitive data to flink internal data. */
-public class RepeatedProtoToRowTest {
+class RepeatedProtoToRowTest {
     @Test
-    public void testRepeated() throws Exception {
+    void testRepeated() throws Exception {
         RepeatedTest simple = 
RepeatedTest.newBuilder().setA(1).addB(1).addB(2).build();
         RowData row = ProtobufTestHelper.pbBytesToRow(RepeatedTest.class, 
simple.toByteArray());
 
-        assertEquals(6, row.getArity());
-        assertEquals(1, row.getInt(0));
+        assertThat(row.getArity()).isEqualTo(6);
+        assertThat(row.getInt(0)).isEqualTo(1);
         ArrayData arr = row.getArray(1);
-        assertEquals(2, arr.size());
-        assertEquals(1L, arr.getLong(0));
-        assertEquals(2L, arr.getLong(1));
+        assertThat(arr.size()).isEqualTo(2);
+        assertThat(arr.getLong(0)).isEqualTo(1L);
+        assertThat(arr.getLong(1)).isEqualTo(2L);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedRowToProtoTest.java
index 4dd0cd9b97f..6cf11c9e945 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/RepeatedRowToProtoTest.java
@@ -24,14 +24,14 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of flink internal array of primitive data to proto data. */
-public class RepeatedRowToProtoTest {
+class RepeatedRowToProtoTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowData row =
                 GenericRowData.of(
                         1,
@@ -43,14 +43,14 @@ public class RepeatedRowToProtoTest {
 
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
RepeatedTest.class);
         RepeatedTest repeatedTest = RepeatedTest.parseFrom(bytes);
-        assertEquals(3, repeatedTest.getBCount());
-        assertEquals(1L, repeatedTest.getB(0));
-        assertEquals(2L, repeatedTest.getB(1));
-        assertEquals(3L, repeatedTest.getB(2));
+        assertThat(repeatedTest.getBCount()).isEqualTo(3);
+        assertThat(repeatedTest.getB(0)).isEqualTo(1L);
+        assertThat(repeatedTest.getB(1)).isEqualTo(2L);
+        assertThat(repeatedTest.getB(2)).isEqualTo(3L);
     }
 
     @Test
-    public void testEmptyArray() throws Exception {
+    void testEmptyArray() throws Exception {
         RowData row =
                 GenericRowData.of(
                         1,
@@ -62,14 +62,14 @@ public class RepeatedRowToProtoTest {
 
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
RepeatedTest.class);
         RepeatedTest repeatedTest = RepeatedTest.parseFrom(bytes);
-        assertEquals(0, repeatedTest.getBCount());
+        assertThat(repeatedTest.getBCount()).isEqualTo(0);
     }
 
     @Test
-    public void testNull() throws Exception {
+    void testNull() throws Exception {
         RowData row = GenericRowData.of(1, null, false, 0.1f, 0.01, 
StringData.fromString("hello"));
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
RepeatedTest.class);
         RepeatedTest repeatedTest = RepeatedTest.parseFrom(bytes);
-        assertEquals(0, repeatedTest.getBCount());
+        assertThat(repeatedTest.getBCount()).isEqualTo(0);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java
index b049f2fb907..a76b4090c31 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameProtoToRowTest.java
@@ -21,15 +21,15 @@ package org.apache.flink.formats.protobuf;
 import 
org.apache.flink.formats.protobuf.testproto.TestSameOuterClassNameOuterClass;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of proto same outer class name data to flink internal 
data. */
-public class SameOuterClassNameProtoToRowTest {
+class SameOuterClassNameProtoToRowTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         TestSameOuterClassNameOuterClass.TestSameOuterClassName 
testSameOuterClassName =
                 
TestSameOuterClassNameOuterClass.TestSameOuterClassName.newBuilder()
                         .setA(1)
@@ -40,12 +40,12 @@ public class SameOuterClassNameProtoToRowTest {
                         
TestSameOuterClassNameOuterClass.TestSameOuterClassName.class,
                         testSameOuterClassName.toByteArray());
 
-        assertEquals(1, row.getInt(0));
-        assertEquals("BAR", row.getString(1).toString());
+        assertThat(row.getInt(0)).isEqualTo(1);
+        assertThat(row.getString(1).toString()).isEqualTo("BAR");
     }
 
     @Test
-    public void testIntEnum() throws Exception {
+    void testIntEnum() throws Exception {
         TestSameOuterClassNameOuterClass.TestSameOuterClassName 
testSameOuterClassName =
                 
TestSameOuterClassNameOuterClass.TestSameOuterClassName.newBuilder()
                         .setB(TestSameOuterClassNameOuterClass.FooBar.BAR)
@@ -56,6 +56,6 @@ public class SameOuterClassNameProtoToRowTest {
                         
TestSameOuterClassNameOuterClass.TestSameOuterClassName.class,
                         testSameOuterClassName.toByteArray(),
                         true);
-        assertEquals(1, row.getInt(1));
+        assertThat(row.getInt(1)).isEqualTo(1);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java
index 9e9e3f7fe69..d973b71f7c3 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SameOuterClassNameRowToProtoTest.java
@@ -23,14 +23,14 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of flink internal primitive data to same outer class name 
proto data. */
-public class SameOuterClassNameRowToProtoTest {
+class SameOuterClassNameRowToProtoTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowData row = GenericRowData.of(1, StringData.fromString("BAR"));
 
         byte[] bytes =
@@ -38,12 +38,13 @@ public class SameOuterClassNameRowToProtoTest {
                         row, 
TestSameOuterClassNameOuterClass.TestSameOuterClassName.class);
         TestSameOuterClassNameOuterClass.TestSameOuterClassName 
testSameOuterClassName =
                 
TestSameOuterClassNameOuterClass.TestSameOuterClassName.parseFrom(bytes);
-        assertEquals(1, testSameOuterClassName.getA());
-        assertEquals(TestSameOuterClassNameOuterClass.FooBar.BAR, 
testSameOuterClassName.getB());
+        assertThat(testSameOuterClassName.getA()).isEqualTo(1);
+        assertThat(testSameOuterClassName.getB())
+                .isEqualTo(TestSameOuterClassNameOuterClass.FooBar.BAR);
     }
 
     @Test
-    public void testEnumAsInt() throws Exception {
+    void testEnumAsInt() throws Exception {
         RowData row = GenericRowData.of(1, 1);
 
         byte[] bytes =
@@ -51,6 +52,7 @@ public class SameOuterClassNameRowToProtoTest {
                         row, 
TestSameOuterClassNameOuterClass.TestSameOuterClassName.class, true);
         TestSameOuterClassNameOuterClass.TestSameOuterClassName 
testSameOuterClassName =
                 
TestSameOuterClassNameOuterClass.TestSameOuterClassName.parseFrom(bytes);
-        assertEquals(TestSameOuterClassNameOuterClass.FooBar.BAR, 
testSameOuterClassName.getB());
+        assertThat(testSameOuterClassName.getB())
+                .isEqualTo(TestSameOuterClassNameOuterClass.FooBar.BAR);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java
index 2409e60db7e..6250b02a8b4 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleProtoToRowTest.java
@@ -23,17 +23,14 @@ import org.apache.flink.formats.protobuf.testproto.Status;
 import org.apache.flink.table.data.RowData;
 
 import com.google.protobuf.ByteString;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of proto primitive data to flink internal data. */
-public class SimpleProtoToRowTest {
+class SimpleProtoToRowTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         SimpleTestMulti simple =
                 SimpleTestMulti.newBuilder()
                         .setA(1)
@@ -51,22 +48,22 @@ public class SimpleProtoToRowTest {
 
         RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, 
simple.toByteArray());
 
-        assertEquals(11, row.getArity());
-        assertEquals(1, row.getInt(0));
-        assertEquals(2L, row.getLong(1));
-        assertFalse(row.getBoolean(2));
-        assertEquals(Float.valueOf(0.1f), Float.valueOf(row.getFloat(3)));
-        assertEquals(Double.valueOf(0.01d), Double.valueOf(row.getDouble(4)));
-        assertEquals("haha", row.getString(5).toString());
-        assertEquals(1, (row.getBinary(6))[0]);
-        assertEquals("IMAGES", row.getString(7).toString());
-        assertEquals("FINISHED", row.getString(8).toString());
-        assertEquals(1, row.getInt(9));
-        assertEquals(2, row.getInt(10));
+        assertThat(row.getArity()).isEqualTo(11);
+        assertThat(row.getInt(0)).isEqualTo(1);
+        assertThat(row.getLong(1)).isEqualTo(2L);
+        assertThat(row.getBoolean(2)).isFalse();
+        assertThat(row.getFloat(3)).isEqualTo(0.1f);
+        assertThat(row.getDouble(4)).isEqualTo(0.01d);
+        assertThat(row.getString(5).toString()).isEqualTo("haha");
+        assertThat(row.getBinary(6)[0]).isEqualTo((byte) 1);
+        assertThat(row.getString(7).toString()).isEqualTo("IMAGES");
+        assertThat(row.getString(8).toString()).isEqualTo("FINISHED");
+        assertThat(row.getInt(9)).isEqualTo(1);
+        assertThat(row.getInt(10)).isEqualTo(2);
     }
 
     @Test
-    public void testNotExistsValueIgnoringDefault() throws Exception {
+    void testNotExistsValueIgnoringDefault() throws Exception {
         SimpleTestMulti simple =
                 SimpleTestMulti.newBuilder()
                         .setB(2L)
@@ -78,12 +75,12 @@ public class SimpleProtoToRowTest {
 
         RowData row = ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, 
simple.toByteArray());
 
-        assertTrue(row.isNullAt(0));
-        assertFalse(row.isNullAt(1));
+        assertThat(row.isNullAt(0)).isTrue();
+        assertThat(row.isNullAt(1)).isFalse();
     }
 
     @Test
-    public void testDefaultValues() throws Exception {
+    void testDefaultValues() throws Exception {
         SimpleTestMulti simple = SimpleTestMulti.newBuilder().build();
 
         RowData row =
@@ -93,28 +90,29 @@ public class SimpleProtoToRowTest {
                         new PbFormatConfig(SimpleTestMulti.class.getName(), 
false, true, ""),
                         false);
 
-        assertFalse(row.isNullAt(0));
-        assertFalse(row.isNullAt(1));
-        assertFalse(row.isNullAt(2));
-        assertFalse(row.isNullAt(3));
-        assertFalse(row.isNullAt(4));
-        assertFalse(row.isNullAt(5));
-        assertFalse(row.isNullAt(6));
-        assertFalse(row.isNullAt(7));
-        assertFalse(row.isNullAt(8));
-        assertEquals(10, row.getInt(0));
-        assertEquals(100L, row.getLong(1));
-        assertFalse(row.getBoolean(2));
-        assertEquals(0.0f, row.getFloat(3), 0.0001);
-        assertEquals(0.0d, row.getDouble(4), 0.0001);
-        assertEquals("f", row.getString(5).toString());
-        assertArrayEquals(ByteString.EMPTY.toByteArray(), row.getBinary(6));
-        assertEquals(SimpleTestMulti.Corpus.UNIVERSAL.toString(), 
row.getString(7).toString());
-        assertEquals(Status.UNSPECIFIED.toString(), 
row.getString(8).toString());
+        assertThat(row.isNullAt(0)).isFalse();
+        assertThat(row.isNullAt(1)).isFalse();
+        assertThat(row.isNullAt(2)).isFalse();
+        assertThat(row.isNullAt(3)).isFalse();
+        assertThat(row.isNullAt(4)).isFalse();
+        assertThat(row.isNullAt(5)).isFalse();
+        assertThat(row.isNullAt(6)).isFalse();
+        assertThat(row.isNullAt(7)).isFalse();
+        assertThat(row.isNullAt(8)).isFalse();
+        assertThat(row.getInt(0)).isEqualTo(10);
+        assertThat(row.getLong(1)).isEqualTo(100L);
+        assertThat(row.getBoolean(2)).isFalse();
+        assertThat(row.getFloat(3)).isEqualTo(0.0f);
+        assertThat(row.getDouble(4)).isEqualTo(0.0d);
+        assertThat(row.getString(5).toString()).isEqualTo("f");
+        assertThat(row.getBinary(6)).isEqualTo(ByteString.EMPTY.toByteArray());
+        assertThat(row.getString(7).toString())
+                .isEqualTo(SimpleTestMulti.Corpus.UNIVERSAL.toString());
+        
assertThat(row.getString(8).toString()).isEqualTo(Status.UNSPECIFIED.toString());
     }
 
     @Test
-    public void testIntEnum() throws Exception {
+    void testIntEnum() throws Exception {
         SimpleTestMulti simple =
                 SimpleTestMulti.newBuilder()
                         .setH(SimpleTestMulti.Corpus.IMAGES)
@@ -122,7 +120,7 @@ public class SimpleProtoToRowTest {
                         .build();
         RowData row =
                 ProtobufTestHelper.pbBytesToRow(SimpleTestMulti.class, 
simple.toByteArray(), true);
-        assertEquals(2, row.getInt(7));
-        assertEquals(1, row.getInt(8));
+        assertThat(row.getInt(7)).isEqualTo(2);
+        assertThat(row.getInt(8)).isEqualTo(1);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java
index 04b059b8bcc..3ebe20461f9 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/SimpleRowToProtoTest.java
@@ -24,16 +24,14 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of flink internal primitive data to proto data. */
-public class SimpleRowToProtoTest {
+class SimpleRowToProtoTest {
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowData row =
                 GenericRowData.of(
                         1,
@@ -50,21 +48,21 @@ public class SimpleRowToProtoTest {
 
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
SimpleTestMulti.class);
         SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes);
-        assertTrue(simpleTestMulti.hasA());
-        assertEquals(1, simpleTestMulti.getA());
-        assertEquals(2L, simpleTestMulti.getB());
-        assertFalse(simpleTestMulti.getC());
-        assertEquals(Float.valueOf(0.1f), 
Float.valueOf(simpleTestMulti.getD()));
-        assertEquals(Double.valueOf(0.01d), 
Double.valueOf(simpleTestMulti.getE()));
-        assertEquals("hello", simpleTestMulti.getF());
-        assertEquals(1, simpleTestMulti.getG().byteAt(0));
-        assertEquals(SimpleTestMulti.Corpus.IMAGES, simpleTestMulti.getH());
-        assertEquals(Status.FINISHED, simpleTestMulti.getI());
-        assertEquals(1, simpleTestMulti.getFAbc7D());
+        assertThat(simpleTestMulti.hasA()).isTrue();
+        assertThat(simpleTestMulti.getA()).isEqualTo(1);
+        assertThat(simpleTestMulti.getB()).isEqualTo(2L);
+        assertThat(simpleTestMulti.getC()).isFalse();
+        assertThat(simpleTestMulti.getD()).isEqualTo(0.1f);
+        assertThat(simpleTestMulti.getE()).isEqualTo(0.01d);
+        assertThat(simpleTestMulti.getF()).isEqualTo("hello");
+        assertThat(simpleTestMulti.getG().byteAt(0)).isEqualTo((byte) 1);
+        
assertThat(simpleTestMulti.getH()).isEqualTo(SimpleTestMulti.Corpus.IMAGES);
+        assertThat(simpleTestMulti.getI()).isEqualTo(Status.FINISHED);
+        assertThat(simpleTestMulti.getFAbc7D()).isEqualTo(1);
     }
 
     @Test
-    public void testNull() throws Exception {
+    void testNull() throws Exception {
         RowData row =
                 GenericRowData.of(
                         null,
@@ -81,14 +79,14 @@ public class SimpleRowToProtoTest {
 
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
SimpleTestMulti.class);
         SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes);
-        assertFalse(simpleTestMulti.hasA());
-        assertFalse(simpleTestMulti.hasG());
-        assertFalse(simpleTestMulti.hasH());
-        assertFalse(simpleTestMulti.hasI());
+        assertThat(simpleTestMulti.hasA()).isFalse();
+        assertThat(simpleTestMulti.hasG()).isFalse();
+        assertThat(simpleTestMulti.hasH()).isFalse();
+        assertThat(simpleTestMulti.hasI()).isFalse();
     }
 
     @Test
-    public void testEnumAsInt() throws Exception {
+    void testEnumAsInt() throws Exception {
         RowData row =
                 GenericRowData.of(
                         null, null, null, null, null, null, null, 2, // 
CORPUS: IMAGE
@@ -97,7 +95,7 @@ public class SimpleRowToProtoTest {
 
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
SimpleTestMulti.class, true);
         SimpleTestMulti simpleTestMulti = SimpleTestMulti.parseFrom(bytes);
-        assertEquals(SimpleTestMulti.Corpus.IMAGES, simpleTestMulti.getH());
-        assertEquals(Status.STARTED, simpleTestMulti.getI());
+        
assertThat(simpleTestMulti.getH()).isEqualTo(SimpleTestMulti.Corpus.IMAGES);
+        assertThat(simpleTestMulti.getI()).isEqualTo(Status.STARTED);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java
index e567a789da1..ed8df4f1bc8 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiProtoToRowTest.java
@@ -22,15 +22,15 @@ import 
org.apache.flink.formats.protobuf.testproto.TimestampTestMulti;
 import org.apache.flink.table.data.RowData;
 
 import com.google.protobuf.Timestamp;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of proto timestamp data with multiple_files options to 
flink internal data. */
-public class TimestampMultiProtoToRowTest {
+class TimestampMultiProtoToRowTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         TimestampTestMulti timestampTestMulti =
                 TimestampTestMulti.newBuilder()
                         
.setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
@@ -40,7 +40,7 @@ public class TimestampMultiProtoToRowTest {
                         TimestampTestMulti.class, 
timestampTestMulti.toByteArray());
 
         RowData rowData = row.getRow(0, 2);
-        assertEquals(1672498800, rowData.getLong(0));
-        assertEquals(123, rowData.getInt(1));
+        assertThat(rowData.getLong(0)).isEqualTo(1672498800);
+        assertThat(rowData.getInt(1)).isEqualTo(123);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java
index 213f42661a2..056d0a8959b 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampMultiRowToProtoTest.java
@@ -22,23 +22,23 @@ import 
org.apache.flink.formats.protobuf.testproto.TimestampTestMulti;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test conversion of flink internal primitive data to proto timestamp data 
with multiple_files
  * options.
  */
-public class TimestampMultiRowToProtoTest {
+class TimestampMultiRowToProtoTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
 
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
TimestampTestMulti.class);
         TimestampTestMulti timestampTestMulti = 
TimestampTestMulti.parseFrom(bytes);
-        assertEquals(1672498800, timestampTestMulti.getTs().getSeconds());
-        assertEquals(123, timestampTestMulti.getTs().getNanos());
+        
assertThat(timestampTestMulti.getTs().getSeconds()).isEqualTo(1672498800);
+        assertThat(timestampTestMulti.getTs().getNanos()).isEqualTo(123);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java
index 55917933bc2..aa1f0431af1 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiProtoToRowTest.java
@@ -22,15 +22,15 @@ import 
org.apache.flink.formats.protobuf.testproto.TestTimestampNomulti;
 import org.apache.flink.table.data.RowData;
 
 import com.google.protobuf.Timestamp;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of proto timestamp data to flink internal data. */
-public class TimestampNoMultiProtoToRowTest {
+class TimestampNoMultiProtoToRowTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         TestTimestampNomulti.TimestampTestNoMulti timestampTestNoMulti =
                 TestTimestampNomulti.TimestampTestNoMulti.newBuilder()
                         
.setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
@@ -41,7 +41,7 @@ public class TimestampNoMultiProtoToRowTest {
                         timestampTestNoMulti.toByteArray());
 
         RowData rowData = row.getRow(0, 2);
-        assertEquals(1672498800, rowData.getLong(0));
-        assertEquals(123, rowData.getInt(1));
+        assertThat(rowData.getLong(0)).isEqualTo(1672498800);
+        assertThat(rowData.getInt(1)).isEqualTo(123);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java
index 65cd877b2bf..38bb55a0c5f 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampNoMultiRowToProtoTest.java
@@ -22,15 +22,15 @@ import 
org.apache.flink.formats.protobuf.testproto.TestTimestampNomulti;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of flink internal primitive data to proto timestamp data. 
*/
-public class TimestampNoMultiRowToProtoTest {
+class TimestampNoMultiRowToProtoTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
 
         byte[] bytes =
@@ -38,7 +38,7 @@ public class TimestampNoMultiRowToProtoTest {
                         row, TestTimestampNomulti.TimestampTestNoMulti.class);
         TestTimestampNomulti.TimestampTestNoMulti timestampTestNoMulti =
                 TestTimestampNomulti.TimestampTestNoMulti.parseFrom(bytes);
-        assertEquals(1672498800, timestampTestNoMulti.getTs().getSeconds());
-        assertEquals(123, timestampTestNoMulti.getTs().getNanos());
+        
assertThat(timestampTestNoMulti.getTs().getSeconds()).isEqualTo(1672498800);
+        assertThat(timestampTestNoMulti.getTs().getNanos()).isEqualTo(123);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java
index 935c17c169b..5b4a285fa23 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiProtoToRowTest.java
@@ -22,18 +22,18 @@ import 
org.apache.flink.formats.protobuf.testproto.TimestampTestOuterMulti;
 import org.apache.flink.table.data.RowData;
 
 import com.google.protobuf.Timestamp;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test conversion of proto timestamp data with multiple_files and 
outer_classname options to flink
  * internal data.
  */
-public class TimestampOuterMultiProtoToRowTest {
+class TimestampOuterMultiProtoToRowTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         TimestampTestOuterMulti timestampTestOuterMulti =
                 TimestampTestOuterMulti.newBuilder()
                         
.setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
@@ -43,7 +43,7 @@ public class TimestampOuterMultiProtoToRowTest {
                         TimestampTestOuterMulti.class, 
timestampTestOuterMulti.toByteArray());
 
         RowData rowData = row.getRow(0, 2);
-        assertEquals(1672498800, rowData.getLong(0));
-        assertEquals(123, rowData.getInt(1));
+        assertThat(rowData.getLong(0)).isEqualTo(1672498800);
+        assertThat(rowData.getInt(1)).isEqualTo(123);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java
index 1f27ed60f0f..5b96b8f8a08 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterMultiRowToProtoTest.java
@@ -22,23 +22,23 @@ import 
org.apache.flink.formats.protobuf.testproto.TimestampTestOuterMulti;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test conversion of flink internal primitive data to proto timestamp data 
with multiple_files and
  * outer_classname options.
  */
-public class TimestampOuterMultiRowToProtoTest {
+class TimestampOuterMultiRowToProtoTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
 
         byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, 
TimestampTestOuterMulti.class);
         TimestampTestOuterMulti timestampTestOuterMulti = 
TimestampTestOuterMulti.parseFrom(bytes);
-        assertEquals(1672498800, timestampTestOuterMulti.getTs().getSeconds());
-        assertEquals(123, timestampTestOuterMulti.getTs().getNanos());
+        
assertThat(timestampTestOuterMulti.getTs().getSeconds()).isEqualTo(1672498800);
+        assertThat(timestampTestOuterMulti.getTs().getNanos()).isEqualTo(123);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java
index 5c2c08fe435..bf2186db8ae 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiProtoToRowTest.java
@@ -22,15 +22,15 @@ import 
org.apache.flink.formats.protobuf.testproto.TimestampTestOuterNomultiProt
 import org.apache.flink.table.data.RowData;
 
 import com.google.protobuf.Timestamp;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test conversion of proto timestamp data with outer_classname options to 
flink internal data. */
-public class TimestampOuterNoMultiProtoToRowTest {
+class TimestampOuterNoMultiProtoToRowTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti 
timestampTestOuterNoMulti =
                 
TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.newBuilder()
                         
.setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
@@ -41,7 +41,7 @@ public class TimestampOuterNoMultiProtoToRowTest {
                         timestampTestOuterNoMulti.toByteArray());
 
         RowData rowData = row.getRow(0, 2);
-        assertEquals(1672498800, rowData.getLong(0));
-        assertEquals(123, rowData.getInt(1));
+        assertThat(rowData.getLong(0)).isEqualTo(1672498800);
+        assertThat(rowData.getInt(1)).isEqualTo(123);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java
index 208b49ab780..6b73bcb0106 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/TimestampOuterNoMultiRowToProtoTest.java
@@ -22,18 +22,18 @@ import 
org.apache.flink.formats.protobuf.testproto.TimestampTestOuterNomultiProt
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test conversion of flink internal primitive data to proto timestamp data 
with outer_classname
  * options.
  */
-public class TimestampOuterNoMultiRowToProtoTest {
+class TimestampOuterNoMultiRowToProtoTest {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         RowData row = GenericRowData.of(GenericRowData.of(1672498800L, 123));
 
         byte[] bytes =
@@ -41,7 +41,7 @@ public class TimestampOuterNoMultiRowToProtoTest {
                         row, 
TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.class);
         TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti 
timestampTestOuterNoMulti =
                 
TimestampTestOuterNomultiProto.TimestampTestOuterNoMulti.parseFrom(bytes);
-        assertEquals(1672498800, 
timestampTestOuterNoMulti.getTs().getSeconds());
-        assertEquals(123, timestampTestOuterNoMulti.getTs().getNanos());
+        
assertThat(timestampTestOuterNoMulti.getTs().getSeconds()).isEqualTo(1672498800);
+        
assertThat(timestampTestOuterNoMulti.getTs().getNanos()).isEqualTo(123);
     }
 }
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowITCase.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowITCase.java
index ca729995a90..27988edf604 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowITCase.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.formats.protobuf;
 
 import org.apache.flink.formats.protobuf.testproto.VeryBigPbClass;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 /**
  * Test for very huge proto definition, which may trigger some special 
optimizations such as code
@@ -29,10 +29,10 @@ import org.junit.Test;
  * <p>Implementing this test as an {@code ITCase} enables larger heap size for 
the test execution.
  * The current unit test execution configuration would cause {@code 
OutOfMemoryErrors}.
  */
-public class VeryBigPbProtoToRowITCase {
+class VeryBigPbProtoToRowITCase {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         VeryBigPbClass.VeryBigPbMessage veryBigPbMessage =
                 VeryBigPbClass.VeryBigPbMessage.newBuilder().build();
         // test generated code can be compiled
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbRowToProtoITCase.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbRowToProtoITCase.java
index 74c448a0cb0..d614567c78d 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbRowToProtoITCase.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbRowToProtoITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.formats.protobuf;
 import org.apache.flink.formats.protobuf.testproto.VeryBigPbClass;
 import org.apache.flink.table.data.GenericRowData;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 /**
  * Test for very huge proto definition, which may trigger some special 
optimizations such as code
@@ -30,10 +30,10 @@ import org.junit.Test;
  * <p>Implementing this test as an {@code ITCase} enables larger heap size for 
the test execution.
  * The current unit test execution configuration would cause {@code 
OutOfMemoryErrors}.
  */
-public class VeryBigPbRowToProtoITCase {
+class VeryBigPbRowToProtoITCase {
 
     @Test
-    public void testSimple() throws Exception {
+    void testSimple() throws Exception {
         GenericRowData rowData = new GenericRowData(4);
 
         // test generated code can be compiled

Reply via email to