This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 11f61d23b80ba639922e43830667eb2010fffdfb
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Wed Dec 3 15:15:24 2025 +0100

    [FLINK-38766][table] Migrate `TableEnvironmentTest` to assertJ
---
 .../flink/table/api/TableEnvironmentTest.scala     | 1207 ++++++++++----------
 1 file changed, 592 insertions(+), 615 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 30b36f1f476..225843946f1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -46,8 +46,7 @@ import _root_.java.util
 import _root_.scala.collection.JavaConverters._
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql.SqlExplainLevel
-import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertTrue, fail}
+import org.assertj.core.api.Assertions.{assertThat, assertThatExceptionOfType, 
assertThatList, assertThatObject}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.io.TempDir
 import org.junit.jupiter.params.ParameterizedTest
@@ -68,9 +67,9 @@ class TableEnvironmentTest {
 
   @Test
   def testScanNonExistTable(): Unit = {
-    assertThatThrownBy(() => tableEnv.from("MyTable"))
-      .hasMessageContaining("Table `MyTable` was not found")
-      .isInstanceOf[ValidationException]
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.from("MyTable"))
+      .withMessageContaining("Table `MyTable` was not found")
   }
 
   @Test
@@ -83,17 +82,17 @@ class TableEnvironmentTest {
     val relNode = TableTestUtil.toRelNode(scanTable)
     val actual = RelOptUtil.toString(relNode)
     val expected = "LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])\n"
-    assertEquals(expected, actual)
-
-    assertThatThrownBy(
-      () =>
-        tableEnv.createTemporaryView(
-          "MyTable",
-          StreamingEnvUtil
-            .fromElements[(Int, Long)](env)))
-      .hasMessageContaining(
+    assertThat(actual).isEqualTo(expected)
+
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () =>
+          tableEnv.createTemporaryView(
+            "MyTable",
+            StreamingEnvUtil
+              .fromElements[(Int, Long)](env)))
+      .withMessageContaining(
         "Temporary table '`default_catalog`.`default_database`.`MyTable`' 
already exists")
-      .isInstanceOf[ValidationException]
   }
 
   @Test
@@ -107,39 +106,39 @@ class TableEnvironmentTest {
     val actual = RelOptUtil.toString(relNode, SqlExplainLevel.NO_ATTRIBUTES)
     val expected = "LogicalProject\n" +
       "  LogicalTableScan\n"
-    assertEquals(expected, actual)
+    assertThat(actual).isEqualTo(expected)
   }
 
   @Test
   def testCreateTableWithEnforcedMode(): Unit = {
     // check column constraint
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |CREATE TABLE MyTable (
-                                                   |  a bigint primary key,
-                                                   |  b int,
-                                                   |  c varchar
-                                                   |) with (
-                                                   |  'connector' = 
'COLLECTION',
-                                                   |  'is-bounded' = 'false'
-                                                   |)
+    assertThatExceptionOfType(classOf[SqlValidateException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |CREATE TABLE MyTable (
+                                              |  a bigint primary key,
+                                              |  b int,
+                                              |  c varchar
+                                              |) with (
+                                              |  'connector' = 'COLLECTION',
+                                              |  'is-bounded' = 'false'
+                                              |)
     """.stripMargin))
-      .hasMessageContaining("Flink doesn't support ENFORCED mode for PRIMARY 
KEY constraint.")
-      .isInstanceOf[ValidationException]
+      .withMessageContaining("Flink doesn't support ENFORCED mode for PRIMARY 
KEY constraint.")
 
     // check table constraint
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |CREATE TABLE MyTable (
-                                                   |  a bigint,
-                                                   |  b int,
-                                                   |  c varchar,
-                                                   |  primary key(a)
-                                                   |) with (
-                                                   |  'connector' = 
'COLLECTION',
-                                                   |  'is-bounded' = 'false'
-                                                   |)
+    assertThatExceptionOfType(classOf[SqlValidateException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |CREATE TABLE MyTable (
+                                              |  a bigint,
+                                              |  b int,
+                                              |  c varchar,
+                                              |  primary key(a)
+                                              |) with (
+                                              |  'connector' = 'COLLECTION',
+                                              |  'is-bounded' = 'false'
+                                              |)
     """.stripMargin))
-      .hasMessageContaining("Flink doesn't support ENFORCED mode for PRIMARY 
KEY constraint.")
-      .isInstanceOf[ValidationException]
+      .withMessageContaining("Flink doesn't support ENFORCED mode for PRIMARY 
KEY constraint.")
   }
 
   @Test
@@ -158,7 +157,8 @@ class TableEnvironmentTest {
 
     val expected = 
TableTestUtil.readFromResource("/explain/testStreamTableEnvironmentExplain.out")
     val actual = tEnv.explainSql("insert into MySink select first from 
MyTable")
-    assertEquals(TableTestUtil.replaceStageId(expected), 
TableTestUtil.replaceStageId(actual))
+    assertThat(TableTestUtil.replaceStageId(actual))
+      .isEqualTo(TableTestUtil.replaceStageId(expected))
   }
 
   @Test
@@ -177,7 +177,8 @@ class TableEnvironmentTest {
 
     val expected = 
TableTestUtil.readFromResource("/explain/testStreamTableEnvironmentExplain.out")
     val actual = tEnv.explainSql("execute insert into MySink select first from 
MyTable")
-    assertEquals(TableTestUtil.replaceStageId(expected), 
TableTestUtil.replaceStageId(actual))
+    assertThat(TableTestUtil.replaceStageId(actual))
+      .isEqualTo(TableTestUtil.replaceStageId(expected))
   }
 
   @Test
@@ -214,12 +215,8 @@ class TableEnvironmentTest {
 
   @Test
   def testAddIllegalJar(): Unit = {
-    try {
-      tableEnv.executeSql(String.format("ADD JAR '%s'", 
"/path/to/illegal.jar"))
-      fail("Should fail.")
-    } catch {
-      case _: TableException => // expected
-    }
+    assertThatExceptionOfType(classOf[TableException])
+      .isThrownBy(() => tableEnv.executeSql(String.format("ADD JAR '%s'", 
"/path/to/illegal.jar")))
   }
 
   private def validateAddJar(useFullPath: Boolean): Unit = {
@@ -240,7 +237,7 @@ class TableEnvironmentTest {
     tableEnv.executeSql(String.format("ADD JAR '%s'", jarPath))
     val tableResult = tableEnv.executeSql("SHOW JARS")
 
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(util.Arrays.asList(Row.of(udfJar.getPath)).iterator(), 
tableResult.collect())
   }
 
@@ -259,10 +256,10 @@ class TableEnvironmentTest {
       "insert into MySink select first from MyTable",
       ExplainDetail.JSON_EXECUTION_PLAN)
 
-    assertEquals(
-      
TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(expected)),
-      
TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(actual))
-    )
+    
assertThat(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(actual)))
+      .isEqualTo(
+        
TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(expected))
+      )
   }
 
   @Test
@@ -316,10 +313,10 @@ class TableEnvironmentTest {
       ExplainDetail.JSON_EXECUTION_PLAN
     )
 
-    assertEquals(
-      
TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(expected)),
-      
TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(actual))
-    )
+    
assertThat(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(actual)))
+      .isEqualTo(
+        
TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(expected))
+      )
   }
 
   @Test
@@ -335,9 +332,9 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     tableEnv.executeSql(statement)
-    assertThatThrownBy(() => tableEnv.executeSql("ALTER TABLE MyTable RESET 
()"))
-      .hasMessageContaining("ALTER TABLE RESET does not support empty key")
-      .isInstanceOf[ValidationException]
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("ALTER TABLE MyTable RESET ()"))
+      .withMessageContaining("ALTER TABLE RESET does not support empty key")
   }
 
   @Test
@@ -355,9 +352,9 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     tableEnv.executeSql(statementWithTypo)
-    assertThatThrownBy(
-      () => tableEnv.executeSql("explain plan for select * from MyTable where 
a > 10"))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("explain plan for select * from 
MyTable where a > 10"))
+      .withMessageContaining(
         "Unable to create a source for reading table " +
           "'default_catalog.default_database.MyTable'.\n\n" +
           "Table options are:\n\n'connector'='datagen'\n" +
@@ -366,18 +363,17 @@ class TableEnvironmentTest {
     // remove invalid key by RESET
     val alterTableResetStatement = "ALTER TABLE MyTable RESET ('invalid-key')"
     val tableResult = tableEnv.executeSql(alterTableResetStatement)
-    assertEquals(ResultKind.SUCCESS, tableResult.getResultKind)
-    assertEquals(
-      Map("connector" -> "datagen").asJava,
+    assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
         
.getTable(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.MyTable"))
         .getOptions
-    )
-    assertEquals(
-      ResultKind.SUCCESS_WITH_CONTENT,
+    ).containsAllEntriesOf(util.Map.of("connector", "datagen"))
+    assertThatObject(
       tableEnv.executeSql("explain plan for select * from MyTable where a > 
10").getResultKind)
+      .isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
   }
 
   @Test
@@ -398,15 +394,14 @@ class TableEnvironmentTest {
 
     val alterTableResetStatement = "ALTER TABLE MyTable RESET ('is-bounded')"
     val tableResult = tableEnv.executeSql(alterTableResetStatement)
-    assertEquals(ResultKind.SUCCESS, tableResult.getResultKind)
-    assertEquals(
-      Map.apply("connector" -> "COLLECTION").asJava,
+    assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
         
.getTable(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.MyTable"))
         .getOptions
-    )
+    ).containsAllEntriesOf(util.Map.of("connector", "COLLECTION"))
     checkTableSource("MyTable", true)
   }
 
@@ -428,22 +423,19 @@ class TableEnvironmentTest {
 
     val alterTableResetStatement = "ALTER TABLE MyTable RESET ('format')"
     val tableResult = tableEnv.executeSql(alterTableResetStatement)
-    assertEquals(ResultKind.SUCCESS, tableResult.getResultKind)
-    assertEquals(
-      Map("connector" -> "filesystem", "path" -> "_invalid").asJava,
+    assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
         
.getTable(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.MyTable"))
         .getOptions
-    )
+    ).containsAllEntriesOf(util.Map.of("connector", "filesystem", "path", 
"_invalid"))
 
-    assertThatThrownBy(
-      () =>
-        tableEnv.executeSql("explain plan for select * from MyTable where a > 
10").getResultKind)
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("explain plan for select * from 
MyTable where a > 10"))
+      .withMessageContaining(
         "Unable to create a source for reading table 
'default_catalog.default_database.MyTable'.")
-      .isInstanceOf[ValidationException]
   }
 
   @Test
@@ -462,35 +454,35 @@ class TableEnvironmentTest {
       """.stripMargin
     tableEnv.executeSql(statement)
 
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable ADD (
-                                                   |  PRIMARY KEY (a) NOT 
ENFORCED
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable ADD (
+                                              |  PRIMARY KEY (a) NOT ENFORCED
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining(
         """Failed to execute ALTER TABLE statement.
           |The current table has already defined the primary key constraint 
[`b`]. You might want to drop it before adding a new one.""".stripMargin)
-      .isInstanceOf[ValidationException]
-
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable ADD (
-                                                   |  a STRING
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining("""Failed to execute ALTER TABLE statement.
-                              |Column `a` already exists in the 
table.""".stripMargin)
-      .isInstanceOf[ValidationException]
-
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable ADD (
-                                                   |  e STRING AFTER h
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining(
+
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable ADD (
+                                              |  a STRING
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining("""Failed to execute ALTER TABLE statement.
+                               |Column `a` already exists in the 
table.""".stripMargin)
+
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable ADD (
+                                              |  e STRING AFTER h
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining(
         """Failed to execute ALTER TABLE statement.
           |Referenced column `h` by 'AFTER' does not exist in the 
table.""".stripMargin
       )
-      .isInstanceOf[ValidationException]
 
     tableEnv.executeSql(
       """
@@ -537,14 +529,14 @@ class TableEnvironmentTest {
       Row.of("g", "ARRAY<INT NOT NULL>", Boolean.box(false), null, null, null, 
null)
     )
     val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult.iterator(), tableResult.collect())
 
-    assertThatThrownBy(() => tableEnv.executeSql("ALTER TABLE MyTable ADD 
WATERMARK FOR ts AS ts"))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("ALTER TABLE MyTable ADD WATERMARK 
FOR ts AS ts"))
+      .withMessageContaining(
         """Failed to execute ALTER TABLE statement.
           |The current table has already defined the watermark strategy `d` AS 
`d` - INTERVAL '2' SECOND. You might want to drop it before adding a new 
one.""".stripMargin)
-      .isInstanceOf[ValidationException]
   }
 
   @Test
@@ -563,24 +555,24 @@ class TableEnvironmentTest {
       """.stripMargin
     tableEnv.executeSql(statement)
 
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable ADD (
-                                                   |  f STRING PRIMARY KEY NOT 
ENFORCED,
-                                                   |  PRIMARY KEY (a) NOT 
ENFORCED
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining("Duplicate primary key definition")
-      .isInstanceOf[org.apache.flink.sql.parser.error.SqlValidateException]
-
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable ADD (
-                                                   |  PRIMARY KEY (c) NOT 
ENFORCED
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[SqlValidateException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable ADD (
+                                              |  f STRING PRIMARY KEY NOT 
ENFORCED,
+                                              |  PRIMARY KEY (a) NOT ENFORCED
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining("Duplicate primary key definition")
+
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable ADD (
+                                              |  PRIMARY KEY (c) NOT ENFORCED
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining(
         """Failed to execute ALTER TABLE statement.
           |Invalid primary key 'PK_c'. Column 'c' is not a physical 
column.""".stripMargin)
-      .isInstanceOf[ValidationException]
 
     tableEnv.executeSql("""
                           |ALTER TABLE MyTable ADD (
@@ -595,7 +587,7 @@ class TableEnvironmentTest {
       Row.of("d", "TIMESTAMP(3)", Boolean.box(true), null, null, null)
     )
     val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult.iterator(), tableResult.collect())
   }
 
@@ -616,15 +608,15 @@ class TableEnvironmentTest {
       """.stripMargin
     tableEnv.executeSql(statement)
 
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable ADD (
-                                                   |  WATERMARK FOR e.e1 AS 
e.e1
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable ADD (
+                                              |  WATERMARK FOR e.e1 AS e.e1
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining(
         """Failed to execute ALTER TABLE statement.
           |Watermark strategy on nested column is not supported 
yet.""".stripMargin)
-      .isInstanceOf[ValidationException]
 
     tableEnv.executeSql("""
                           |ALTER TABLE MyTable ADD (
@@ -640,7 +632,7 @@ class TableEnvironmentTest {
       Row.of("e", "ROW<`e0` STRING, `e1` TIMESTAMP(3)>", Boolean.box(true), 
null, null, null)
     )
     val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult.iterator(), tableResult.collect())
   }
 
@@ -665,34 +657,34 @@ class TableEnvironmentTest {
       """.stripMargin
     tableEnv.executeSql(statement)
 
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable MODIFY 
(
-                                                   |  x STRING FIRST
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining("""Failed to execute ALTER TABLE statement.
-                              |Column `x` does not exist in the 
table.""".stripMargin)
-      .isInstanceOf[ValidationException]
-
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable MODIFY 
(
-                                                   |  b INT FIRST,
-                                                   |  a BIGINT AFTER x
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable MODIFY (
+                                              |  x STRING FIRST
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining("""Failed to execute ALTER TABLE statement.
+                               |Column `x` does not exist in the 
table.""".stripMargin)
+
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable MODIFY (
+                                              |  b INT FIRST,
+                                              |  a BIGINT AFTER x
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining(
         """Failed to execute ALTER TABLE statement.
           |Referenced column `x` by 'AFTER' does not exist in the 
table.""".stripMargin)
-      .isInstanceOf[ValidationException]
 
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable MODIFY 
(
-                                                   |  b BOOLEAN first
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining("""Failed to execute ALTER TABLE statement.
-                              |Invalid expression for computed column 
'e'.""".stripMargin)
-      .isInstanceOf[ValidationException]
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable MODIFY (
+                                              |  b BOOLEAN first
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining("""Failed to execute ALTER TABLE statement.
+                               |Invalid expression for computed column 
'e'.""".stripMargin)
 
     tableEnv.executeSql("""
                           |ALTER TABLE MyTable MODIFY (
@@ -716,7 +708,7 @@ class TableEnvironmentTest {
         "`ts2` - INTERVAL '1' SECOND")
     )
     val tableResult1 = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind)
+    
assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), tableResult1.collect())
 
     tableEnv.executeSql("""
@@ -738,7 +730,7 @@ class TableEnvironmentTest {
       Row.of("a", "DOUBLE", Boolean.box(false), null, null, null)
     )
     val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult2.iterator(), tableResult.collect())
   }
 
@@ -757,15 +749,15 @@ class TableEnvironmentTest {
       """.stripMargin
     tableEnv.executeSql(statement)
 
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable MODIFY 
(
-                                                   |  PRIMARY KEY (x) NOT 
ENFORCED
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable MODIFY (
+                                              |  PRIMARY KEY (x) NOT ENFORCED
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining(
         """Failed to execute ALTER TABLE statement.
           |The current table does not define any primary key constraint. You 
might want to add a new one.""".stripMargin)
-      .isInstanceOf[ValidationException]
 
     tableEnv.executeSql("""
                           |ALTER TABLE MyTable ADD (
@@ -784,7 +776,7 @@ class TableEnvironmentTest {
       Row.of("c", "STRING", Boolean.box(true), null, "METADATA VIRTUAL", null)
     )
     val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult.iterator(), tableResult.collect())
   }
 
@@ -805,15 +797,15 @@ class TableEnvironmentTest {
       """.stripMargin
     tableEnv.executeSql(statement)
 
-    assertThatThrownBy(() => tableEnv.executeSql("""
-                                                   |ALTER TABLE MyTable MODIFY 
(
-                                                   |  WATERMARK FOR e.e1 AS 
e.e1
-                                                   |)
-                                                   |""".stripMargin))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("""
+                                              |ALTER TABLE MyTable MODIFY (
+                                              |  WATERMARK FOR e.e1 AS e.e1
+                                              |)
+                                              |""".stripMargin))
+      .withMessageContaining(
         """Failed to execute ALTER TABLE statement.
           |Watermark strategy on nested column is not supported 
yet.""".stripMargin)
-      .isInstanceOf[ValidationException]
 
     tableEnv.executeSql("""
                           |ALTER TABLE MyTable MODIFY (
@@ -828,7 +820,7 @@ class TableEnvironmentTest {
       Row.of("e", "ROW<`e0` STRING, `e1` TIMESTAMP(3)>", Boolean.box(true), 
null, null, null)
     )
     val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult.iterator(), tableResult.collect())
   }
 
@@ -846,7 +838,7 @@ class TableEnvironmentTest {
                           |ALTER TABLE MyTable RENAME a TO b
                           |""".stripMargin)
     val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(
       Collections
         .singletonList(Row.of("b", "BIGINT", Boolean.box(true), null, null, 
null))
@@ -884,7 +876,7 @@ class TableEnvironmentTest {
         "`d` - INTERVAL '1' MINUTE")
     )
     val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult.iterator(), tableResult.collect())
   }
 
@@ -914,13 +906,13 @@ class TableEnvironmentTest {
       Row.of("e", "ROW<`e0` STRING, `e1` TIMESTAMP(3)>", Boolean.box(true), 
null, null, null)
     )
     val tableResult1 = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind)
+    
assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult.iterator(), tableResult1.collect())
 
     tableEnv.executeSql("ALTER TABLE MyTable ADD CONSTRAINT ct PRIMARY KEY(a) 
NOT ENFORCED")
     tableEnv.executeSql("ALTER TABLE MyTable DROP PRIMARY KEY")
     val tableResult2 = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    
assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult.iterator(), tableResult2.collect())
   }
 
@@ -950,7 +942,7 @@ class TableEnvironmentTest {
       Row.of("e", "ROW<`e0` STRING, `e1` TIMESTAMP(3)>", Boolean.box(true), 
null, null, null)
     )
     val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult.iterator(), tableResult.collect())
   }
 
@@ -988,23 +980,23 @@ class TableEnvironmentTest {
     // drop the partitions
     var tableResult =
       tableEnv.executeSql("alter table tbl drop partition(b='1000', c 
='2020-05-01')")
-    assertEquals(ResultKind.SUCCESS, tableResult.getResultKind)
+    assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS)
     assertThat(catalog.listPartitions(tablePath).toString)
       .isEqualTo("[CatalogPartitionSpec{{b=2000, c=2020-01-01}}]")
 
     // drop the partition again with if exists
     tableResult =
       tableEnv.executeSql("alter table tbl drop if exists partition(b='1000', 
c='2020-05-01')")
-    assertEquals(ResultKind.SUCCESS, tableResult.getResultKind)
+    assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS)
     assertThat(catalog.listPartitions(tablePath).toString)
       .isEqualTo("[CatalogPartitionSpec{{b=2000, c=2020-01-01}}]")
 
     // drop the partition again without if exists,
     // should throw exception then
-    assertThatThrownBy(
-      () => tableEnv.executeSql("alter table tbl drop partition 
(b=1000,c='2020-05-01')"))
-      .isInstanceOf(classOf[TableException])
-      .hasMessageContaining("Could not execute ALTER TABLE 
default_catalog.default_database.tbl" +
+    assertThatExceptionOfType(classOf[TableException])
+      .isThrownBy(
+        () => tableEnv.executeSql("alter table tbl drop partition 
(b=1000,c='2020-05-01')"))
+      .withMessageContaining("Could not execute ALTER TABLE 
default_catalog.default_database.tbl" +
         " DROP PARTITION (b=1000, c=2020-05-01)")
   }
 
@@ -1023,28 +1015,28 @@ class TableEnvironmentTest {
     tableEnv.executeSql(statement)
     tableEnv.executeSql("CREATE TEMPORARY VIEW my_view AS SELECT a, c FROM 
MyTable")
 
-    assertThatThrownBy(
-      () => tableEnv.executeSql("SELECT c FROM my_view /*+ 
OPTIONS('is-bounded' = 'true') */"))
-      .hasMessageContaining("View 
'`default_catalog`.`default_database`.`my_view`' " +
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () => tableEnv.executeSql("SELECT c FROM my_view /*+ 
OPTIONS('is-bounded' = 'true') */"))
+      .withMessageContaining("View 
'`default_catalog`.`default_database`.`my_view`' " +
         "cannot be enriched with new options. Hints can only be applied to 
tables.")
-      .isInstanceOf(classOf[ValidationException])
-
-    assertThatThrownBy(
-      () =>
-        tableEnv.executeSql(
-          "CREATE TEMPORARY VIEW your_view AS " +
-            "SELECT c FROM my_view /*+ OPTIONS('is-bounded' = 'true') */"))
-      .hasMessageContaining("View 
'`default_catalog`.`default_database`.`my_view`' " +
+
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () =>
+          tableEnv.executeSql(
+            "CREATE TEMPORARY VIEW your_view AS " +
+              "SELECT c FROM my_view /*+ OPTIONS('is-bounded' = 'true') */"))
+      .withMessageContaining("View 
'`default_catalog`.`default_database`.`my_view`' " +
         "cannot be enriched with new options. Hints can only be applied to 
tables.")
-      .isInstanceOf(classOf[ValidationException])
 
     tableEnv.executeSql("CREATE TEMPORARY VIEW your_view AS SELECT c FROM 
my_view ")
 
-    assertThatThrownBy(
-      () => tableEnv.executeSql("SELECT * FROM your_view /*+ 
OPTIONS('is-bounded' = 'true') */"))
-      .hasMessageContaining("View 
'`default_catalog`.`default_database`.`your_view`' " +
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () => tableEnv.executeSql("SELECT * FROM your_view /*+ 
OPTIONS('is-bounded' = 'true') */"))
+      .withMessageContaining("View 
'`default_catalog`.`default_database`.`your_view`' " +
         "cannot be enriched with new options. Hints can only be applied to 
tables.")
-      .isInstanceOf(classOf[ValidationException])
 
   }
 
@@ -1052,30 +1044,30 @@ class TableEnvironmentTest {
   def testExecuteSqlWithCreateAlterDropTable(): Unit = {
     createTableForTests()
 
-    assertTrue(
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
-        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.T1")))
+        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.T1"))).isTrue
 
     val tableResult2 = tableEnv.executeSql("ALTER TABLE T1 SET ('k1' = 'a', 
'k2' = 'b')")
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assertEquals(
-      Map("connector" -> "COLLECTION", "is-bounded" -> "false", "k1" -> "a", 
"k2" -> "b").asJava,
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
         .getTable(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.T1"))
         .getOptions
-    )
+    ).containsAllEntriesOf(
+      util.Map.of("connector", "COLLECTION", "is-bounded", "false", "k1", "a", 
"k2", "b"))
 
     val tableResult3 = tableEnv.executeSql("DROP TABLE T1")
-    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
-    assertFalse(
+    assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
-        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.T1")))
+        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.T1"))).isFalse
   }
 
   @Test
@@ -1093,22 +1085,22 @@ class TableEnvironmentTest {
       """.stripMargin
     // test create table twice
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
     val tableResult2 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assertTrue(
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
-        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.tbl1")))
+        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.tbl1"))).isTrue
 
     val tableResult3 = tableEnv.executeSql("DROP TABLE IF EXISTS tbl1")
-    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
-    assertFalse(
+    assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
-        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.tbl1")))
+        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.tbl1"))).isFalse
   }
 
   @Test
@@ -1126,14 +1118,14 @@ class TableEnvironmentTest {
       """.stripMargin
     // test crate table twice
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
     val tableResult2 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assertTrue(tableEnv.listTables().contains("tbl1"))
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).containsExactly("tbl1")
 
     val tableResult3 = tableEnv.executeSql("DROP TEMPORARY TABLE IF EXISTS 
tbl1")
-    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
-    assertFalse(tableEnv.listTables().contains("tbl1"))
+    assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).doesNotContain("tbl1")
   }
 
   @Test
@@ -1150,12 +1142,12 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array[String]("tbl1")))
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).containsExactly("tbl1")
 
     val tableResult2 = tableEnv.executeSql("DROP TEMPORARY TABLE tbl1")
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array.empty[String]))
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).isEmpty()
   }
 
   @Test
@@ -1172,16 +1164,16 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array[String]("tbl1")))
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).containsExactly("tbl1")
 
     val tableResult2 = tableEnv.executeSql("DROP TEMPORARY TABLE IF EXISTS 
tbl1")
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array.empty[String]))
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).isEmpty()
 
     val tableResult3 = tableEnv.executeSql("DROP TEMPORARY TABLE IF EXISTS 
tbl1")
-    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array.empty[String]))
+    assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).isEmpty()
   }
 
   @Test
@@ -1198,16 +1190,16 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array[String]("tbl1")))
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).containsExactly("tbl1")
 
     val tableResult2 = tableEnv.executeSql("DROP TEMPORARY TABLE tbl1")
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array.empty[String]))
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).isEmpty()
 
     // fail the case
-    assertThatThrownBy(() => tableEnv.executeSql("DROP TEMPORARY TABLE tbl1"))
-      .isInstanceOf[ValidationException]
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("DROP TEMPORARY TABLE tbl1"))
   }
 
   @Test
@@ -1224,13 +1216,13 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array[String]("tbl1")))
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).containsExactly("tbl1")
 
     val tableResult2 =
       tableEnv.executeSql("DROP TEMPORARY TABLE 
default_catalog.default_database.tbl1")
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array.empty[String]))
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).isEmpty()
   }
 
   @Test
@@ -1247,30 +1239,30 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array[String]("tbl1")))
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).containsExactly("tbl1")
 
     // fail the case
-    assertThatThrownBy(
-      () => tableEnv.executeSql("DROP TEMPORARY TABLE 
invalid_catalog.invalid_database.tbl1"))
-      .isInstanceOf[ValidationException]
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () => tableEnv.executeSql("DROP TEMPORARY TABLE 
invalid_catalog.invalid_database.tbl1"))
   }
 
   @Test
   def testExecuteSqlWithCreateAlterDropDatabase(): Unit = {
     val tableResult1 = tableEnv.executeSql("CREATE DATABASE db1 COMMENT 
'db1_comment'")
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
-    
assertTrue(tableEnv.getCatalog(tableEnv.getCurrentCatalog).get().databaseExists("db1"))
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    
assertThat(tableEnv.getCatalog(tableEnv.getCurrentCatalog).get().databaseExists("db1")).isTrue
 
     val tableResult2 = tableEnv.executeSql("ALTER DATABASE db1 SET ('k1' = 
'a', 'k2' = 'b')")
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assertEquals(
-      Map("k1" -> "a", "k2" -> "b").asJava,
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       
tableEnv.getCatalog(tableEnv.getCurrentCatalog).get().getDatabase("db1").getProperties)
+      .containsAllEntriesOf(util.Map.of("k1", "a", "k2", "b"))
 
     val tableResult3 = tableEnv.executeSql("DROP DATABASE db1")
-    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
-    
assertFalse(tableEnv.getCatalog(tableEnv.getCurrentCatalog).get().databaseExists("db1"))
+    assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS)
+    
assertThat(tableEnv.getCatalog(tableEnv.getCurrentCatalog).get().databaseExists("db1")).isFalse
   }
 
   @Test
@@ -1279,81 +1271,81 @@ class TableEnvironmentTest {
     val funcName2 = classOf[SimpleScalarFunction].getName
 
     val tableResult1 = tableEnv.executeSql(s"CREATE FUNCTION 
default_database.f1 AS '$funcName'")
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
-    assertTrue(
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
-        .functionExists(ObjectPath.fromString("default_database.f1")))
+        .functionExists(ObjectPath.fromString("default_database.f1"))).isTrue
 
     val tableResult2 = tableEnv.executeSql(s"ALTER FUNCTION 
default_database.f1 AS '$funcName2'")
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assertTrue(
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
-        .functionExists(ObjectPath.fromString("default_database.f1")))
+        .functionExists(ObjectPath.fromString("default_database.f1"))).isTrue
 
     val tableResult3 = tableEnv.executeSql("DROP FUNCTION default_database.f1")
-    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
-    assertFalse(
+    assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
-        .functionExists(ObjectPath.fromString("default_database.f1")))
+        .functionExists(ObjectPath.fromString("default_database.f1"))).isFalse
 
     val tableResult4 = tableEnv.executeSql(s"CREATE TEMPORARY SYSTEM FUNCTION 
f2 AS '$funcName'")
-    assertEquals(ResultKind.SUCCESS, tableResult4.getResultKind)
-    assertTrue(tableEnv.listUserDefinedFunctions().contains("f2"))
+    assertThatObject(tableResult4.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listUserDefinedFunctions()).containsExactly("f2")
 
     val tableResult5 = tableEnv.executeSql("DROP TEMPORARY SYSTEM FUNCTION f2")
-    assertEquals(ResultKind.SUCCESS, tableResult5.getResultKind)
-    assertFalse(tableEnv.listUserDefinedFunctions().contains("f2"))
+    assertThatObject(tableResult5.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listUserDefinedFunctions()).doesNotContain("f2")
   }
 
   @Test
   def testExecuteSqlWithCreateUseDropCatalog(): Unit = {
     val tableResult1 =
       tableEnv.executeSql("CREATE CATALOG my_catalog 
WITH('type'='generic_in_memory')")
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
-    assertTrue(tableEnv.getCatalog("my_catalog").isPresent)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.getCatalog("my_catalog")).isPresent
 
-    assertEquals("default_catalog", tableEnv.getCurrentCatalog)
+    assertThat(tableEnv.getCurrentCatalog).hasToString("default_catalog")
     val tableResult2 = tableEnv.executeSql("USE CATALOG my_catalog")
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assertEquals("my_catalog", tableEnv.getCurrentCatalog)
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.getCurrentCatalog).hasToString("my_catalog")
 
-    assertThatThrownBy(() => tableEnv.executeSql("DROP CATALOG my_catalog"))
-      .isInstanceOf(classOf[ValidationException])
-      .hasRootCauseMessage("Cannot drop a catalog which is currently in use.")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("DROP CATALOG my_catalog"))
+      .havingRootCause()
+      .withMessageContaining("Cannot drop a catalog which is currently in 
use.")
 
     tableEnv.executeSql("USE CATALOG default_catalog")
 
     val tableResult3 = tableEnv.executeSql("DROP CATALOG my_catalog")
-    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
-    assertFalse(tableEnv.getCatalog("my_catalog").isPresent)
+    assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.getCatalog("my_catalog")).isNotPresent
   }
 
   @Test
   def testExecuteSqlWithUseDatabase(): Unit = {
     val tableResult1 = tableEnv.executeSql("CREATE DATABASE db1 COMMENT 
'db1_comment'")
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
-    
assertTrue(tableEnv.getCatalog(tableEnv.getCurrentCatalog).get().databaseExists("db1"))
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    
assertThat(tableEnv.getCatalog(tableEnv.getCurrentCatalog).get().databaseExists("db1")).isTrue
 
-    assertEquals("default_database", tableEnv.getCurrentDatabase)
+    assertThat(tableEnv.getCurrentDatabase).hasToString("default_database")
     val tableResult2 = tableEnv.executeSql("USE db1")
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
-    assertEquals("db1", tableEnv.getCurrentDatabase)
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.getCurrentDatabase).hasToString("db1")
   }
 
   @Test
   def testExecuteSqlWithShowCatalogs(): Unit = {
     tableEnv.registerCatalog("my_catalog", new 
GenericInMemoryCatalog("my_catalog"))
     val tableResult = tableEnv.executeSql("SHOW CATALOGS")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("catalog name", DataTypes.STRING())),
-      tableResult.getResolvedSchema)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("catalog name", 
DataTypes.STRING())))
+      .isEqualTo(tableResult.getResolvedSchema)
     checkData(
       util.Arrays.asList(Row.of("default_catalog"), 
Row.of("my_catalog")).iterator(),
       tableResult.collect())
@@ -1362,12 +1354,11 @@ class TableEnvironmentTest {
   @Test
   def testExecuteSqlWithShowDatabases(): Unit = {
     val tableResult1 = tableEnv.executeSql("CREATE DATABASE db1 COMMENT 
'db1_comment'")
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
     val tableResult2 = tableEnv.executeSql("SHOW DATABASES")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("database name", DataTypes.STRING())),
-      tableResult2.getResolvedSchema)
+    
assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("database name", 
DataTypes.STRING())))
+      .isEqualTo(tableResult2.getResolvedSchema)
     checkData(
       util.Arrays.asList(Row.of("db1"), Row.of("default_database")).iterator(),
       tableResult2.collect())
@@ -1378,10 +1369,9 @@ class TableEnvironmentTest {
     createTableForTests()
 
     val tableResult2 = tableEnv.executeSql("SHOW TABLES")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("table name", DataTypes.STRING())),
-      tableResult2.getResolvedSchema)
+    
assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("table name", 
DataTypes.STRING())))
+      .isEqualTo(tableResult2.getResolvedSchema)
     checkData(util.Arrays.asList(Row.of("T1")).iterator(), 
tableResult2.collect())
   }
 
@@ -1389,10 +1379,10 @@ class TableEnvironmentTest {
   def testExecuteSqlWithEnhancedShowTables(): Unit = {
     val createCatalogResult =
       tableEnv.executeSql("CREATE CATALOG catalog1 
WITH('type'='generic_in_memory')")
-    assertEquals(ResultKind.SUCCESS, createCatalogResult.getResultKind)
+    
assertThatObject(createCatalogResult.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createDbResult = tableEnv.executeSql("CREATE database catalog1.db1")
-    assertEquals(ResultKind.SUCCESS, createDbResult.getResultKind)
+    assertThatObject(createDbResult.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createTableStmt =
       """
@@ -1406,7 +1396,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createTableStmt2 =
       """
@@ -1418,13 +1408,12 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult2 = tableEnv.executeSql(createTableStmt2)
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val tableResult3 = tableEnv.executeSql("SHOW TABLES FROM catalog1.db1 like 
'p_r%'")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("table name", DataTypes.STRING())),
-      tableResult3.getResolvedSchema)
+    
assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("table name", 
DataTypes.STRING())))
+      .isEqualTo(tableResult3.getResolvedSchema)
     checkData(util.Arrays.asList(Row.of("person")).iterator(), 
tableResult3.collect())
   }
 
@@ -1432,59 +1421,56 @@ class TableEnvironmentTest {
   def testExecuteSqlWithEnhancedShowViews(): Unit = {
     val createCatalogResult =
       tableEnv.executeSql("CREATE CATALOG catalog1 
WITH('type'='generic_in_memory')")
-    assertEquals(ResultKind.SUCCESS, createCatalogResult.getResultKind)
+    
assertThatObject(createCatalogResult.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createDbResult = tableEnv.executeSql("CREATE database catalog1.db1")
-    assertEquals(ResultKind.SUCCESS, createDbResult.getResultKind)
+    assertThatObject(createDbResult.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createTableStmt =
       """
         |CREATE VIEW catalog1.db1.view1 AS SELECT 1, 'abc'
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createTableStmt2 =
       """
         |CREATE VIEW catalog1.db1.view2 AS SELECT 123
       """.stripMargin
     val tableResult2 = tableEnv.executeSql(createTableStmt2)
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val tableResult3 = tableEnv.executeSql("SHOW VIEWS FROM catalog1.db1 like 
'%w1'")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("view name", DataTypes.STRING())),
-      tableResult3.getResolvedSchema)
+    
assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("view name", 
DataTypes.STRING())))
+      .isEqualTo(tableResult3.getResolvedSchema)
     checkData(util.Arrays.asList(Row.of("view1")).iterator(), 
tableResult3.collect())
   }
 
   @Test
   def testExecuteSqlWithShowFunctions(): Unit = {
     val tableResult = tableEnv.executeSql("SHOW FUNCTIONS")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("function name", DataTypes.STRING())),
-      tableResult.getResolvedSchema)
+    
assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("function name", 
DataTypes.STRING())))
+      .isEqualTo(tableResult.getResolvedSchema)
     checkData(
       tableEnv.listFunctions().map(Row.of(_)).toList.asJava.iterator(),
       tableResult.collect())
 
     val funcName = classOf[TestUDF].getName
     val tableResult1 = tableEnv.executeSql(s"CREATE FUNCTION 
default_database.f1 AS '$funcName'")
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
     val tableResult2 = tableEnv.executeSql("SHOW USER FUNCTIONS")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("function name", DataTypes.STRING())),
-      tableResult2.getResolvedSchema)
+    
assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("function name", 
DataTypes.STRING())))
+      .isEqualTo(tableResult2.getResolvedSchema)
     checkData(util.Arrays.asList(Row.of("f1")).iterator(), 
tableResult2.collect())
   }
 
   @Test
   def testExecuteSqlWithLoadModule(): Unit = {
     val result = tableEnv.executeSql("LOAD MODULE dummy")
-    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assertThatObject(result.getResultKind).isSameAs(ResultKind.SUCCESS)
     checkListModules("core", "dummy")
     checkListFullModules(("core", true), ("dummy", true))
 
@@ -1495,10 +1481,10 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
 
-    assertThatThrownBy(() => tableEnv.executeSql(statement))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(statement))
+      .withMessageContaining(
         "Option 'type' = 'dummy' is not supported since module name is used to 
find module")
-      .isInstanceOf[ValidationException]
   }
 
   @Test
@@ -1510,7 +1496,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val result = tableEnv.executeSql(statement1)
-    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assertThatObject(result.getResultKind).isSameAs(ResultKind.SUCCESS)
     checkListModules("core", "dummy")
     checkListFullModules(("core", true), ("dummy", true))
 
@@ -1520,10 +1506,10 @@ class TableEnvironmentTest {
         |  'dummy-version' = '2'
         |)
       """.stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(statement2))
-      .hasMessageContaining("Could not execute LOAD MODULE `dummy` WITH 
('dummy-version' = '2')." +
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(statement2))
+      .withMessageContaining("Could not execute LOAD MODULE `dummy` WITH 
('dummy-version' = '2')." +
         " A module with name 'dummy' already exists")
-      .isInstanceOf[ValidationException]
   }
 
   @Test
@@ -1535,10 +1521,10 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
 
-    assertThatThrownBy(() => tableEnv.executeSql(statement1))
-      .hasMessageContaining(
-        "Could not execute LOAD MODULE `Dummy` WITH ('dummy-version' = '1')."
-          + " Unable to create module 'Dummy'.")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(statement1))
+      .withMessageContaining("Could not execute LOAD MODULE `Dummy` WITH 
('dummy-version' = '1')."
+        + " Unable to create module 'Dummy'.")
 
     val statement2 =
       """
@@ -1547,7 +1533,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val result = tableEnv.executeSql(statement2)
-    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assertThatObject(result.getResultKind).isSameAs(ResultKind.SUCCESS)
     checkListModules("core", "dummy")
     checkListFullModules(("core", true), ("dummy", true))
   }
@@ -1559,14 +1545,14 @@ class TableEnvironmentTest {
     checkListFullModules(("core", true), ("dummy", true))
 
     val result = tableEnv.executeSql("UNLOAD MODULE dummy")
-    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assertThatObject(result.getResultKind).isSameAs(ResultKind.SUCCESS)
     checkListModules("core")
     checkListFullModules(("core", true))
 
-    assertThatThrownBy(() => tableEnv.executeSql("UNLOAD MODULE dummy"))
-      .hasMessageContaining("Could not execute UNLOAD MODULE dummy." +
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("UNLOAD MODULE dummy"))
+      .withMessageContaining("Could not execute UNLOAD MODULE dummy." +
         " No module with name 'dummy' exists")
-      .isInstanceOf[ValidationException]
   }
 
   @Test
@@ -1576,40 +1562,40 @@ class TableEnvironmentTest {
     checkListFullModules(("core", true), ("dummy", true))
 
     val result1 = tableEnv.executeSql("USE MODULES dummy")
-    assertEquals(ResultKind.SUCCESS, result1.getResultKind)
+    assertThatObject(result1.getResultKind).isSameAs(ResultKind.SUCCESS)
     checkListModules("dummy")
     checkListFullModules(("dummy", true), ("core", false))
 
     val result2 = tableEnv.executeSql("USE MODULES dummy, core")
-    assertEquals(ResultKind.SUCCESS, result2.getResultKind)
+    assertThatObject(result2.getResultKind).isSameAs(ResultKind.SUCCESS)
     checkListModules("dummy", "core")
     checkListFullModules(("dummy", true), ("core", true))
 
     val result3 = tableEnv.executeSql("USE MODULES core, dummy")
-    assertEquals(ResultKind.SUCCESS, result3.getResultKind)
+    assertThatObject(result3.getResultKind).isSameAs(ResultKind.SUCCESS)
     checkListModules("core", "dummy")
     checkListFullModules(("core", true), ("dummy", true))
 
     val result4 = tableEnv.executeSql("USE MODULES core")
-    assertEquals(ResultKind.SUCCESS, result4.getResultKind)
+    assertThatObject(result4.getResultKind).isSameAs(ResultKind.SUCCESS)
     checkListModules("core")
     checkListFullModules(("core", true), ("dummy", false))
   }
 
   @Test
   def testExecuteSqlWithUseUnloadedModules(): Unit = {
-    assertThatThrownBy(() => tableEnv.executeSql("USE MODULES core, dummy"))
-      .hasMessageContaining("Could not execute USE MODULES: [core, dummy]. " +
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("USE MODULES core, dummy"))
+      .withMessageContaining("Could not execute USE MODULES: [core, dummy]. " +
         "No module with name 'dummy' exists")
-      .isInstanceOf[ValidationException]
   }
 
   @Test
   def testExecuteSqlWithUseDuplicateModuleNames(): Unit = {
-    assertThatThrownBy(() => tableEnv.executeSql("USE MODULES core, core"))
-      .hasMessageContaining("Could not execute USE MODULES: [core, core]. " +
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("USE MODULES core, core"))
+      .withMessageContaining("Could not execute USE MODULES: [core, core]. " +
         "Module 'core' appears more than once")
-      .isInstanceOf[ValidationException]
   }
 
   @Test
@@ -1635,20 +1621,20 @@ class TableEnvironmentTest {
     createTableForTests()
 
     val viewResult1 = tableEnv.executeSql("CREATE VIEW IF NOT EXISTS v1 AS 
SELECT * FROM T1")
-    assertEquals(ResultKind.SUCCESS, viewResult1.getResultKind)
-    assertTrue(
+    assertThatObject(viewResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
-        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.v1")))
+        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.v1"))).isTrue
 
     val viewResult2 = tableEnv.executeSql("DROP VIEW IF EXISTS v1")
-    assertEquals(ResultKind.SUCCESS, viewResult2.getResultKind)
-    assertFalse(
+    assertThatObject(viewResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
-        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.v1")))
+        
.tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.v1"))).isFalse
   }
 
   @Test
@@ -1657,12 +1643,12 @@ class TableEnvironmentTest {
 
     val viewResult1 =
       tableEnv.executeSql("CREATE TEMPORARY VIEW IF NOT EXISTS v1 AS SELECT * 
FROM T1")
-    assertEquals(ResultKind.SUCCESS, viewResult1.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "v1")))
+    assertThatObject(viewResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).containsExactly("T1", "v1")
 
     val viewResult2 = tableEnv.executeSql("DROP TEMPORARY VIEW IF EXISTS v1")
-    assertEquals(ResultKind.SUCCESS, viewResult2.getResultKind)
-    assert(tableEnv.listTables().sameElements(Array[String]("T1")))
+    assertThatObject(viewResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
+    assertThat(tableEnv.listTables()).containsExactly("T1")
   }
 
   @Test
@@ -1694,17 +1680,17 @@ class TableEnvironmentTest {
         |CREATE VIEW IF NOT EXISTS T3(d) AS SELECT * FROM T1
       """.stripMargin
 
-    assertThatThrownBy(
-      () => {
-        tableEnv.executeSql(sourceDDL)
-        tableEnv.executeSql(sinkDDL)
-        tableEnv.executeSql(viewDDL)
-      })
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () => {
+          tableEnv.executeSql(sourceDDL)
+          tableEnv.executeSql(sinkDDL)
+          tableEnv.executeSql(viewDDL)
+        })
+      .withMessageContaining(
         "VIEW definition and input fields not match:\n" +
           "\tDef fields: [d].\n" +
           "\tInput fields: [a, b, c].")
-      .isInstanceOf[ValidationException]
   }
 
   @Test
@@ -1745,10 +1731,10 @@ class TableEnvironmentTest {
     tableEnv.executeSql(sinkDDL)
     tableEnv.executeSql(viewWith3ColumnDDL)
 
-    assertThatThrownBy(() => tableEnv.executeSql(viewWith2ColumnDDL))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(viewWith2ColumnDDL))
+      .withMessageContaining(
         "Could not execute CreateTable in path 
`default_catalog`.`default_database`.`T3`")
-      .isInstanceOf[ValidationException]
   }
 
   @ParameterizedTest
@@ -1756,13 +1742,13 @@ class TableEnvironmentTest {
   def testDropViewWithFullPath(isSql: Boolean): Unit = {
     createViewsForDropTests()
 
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T2", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T2", "T3")
 
     dropView("default_catalog.default_database.T2", isSql)
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T3")
 
     dropView("default_catalog.default_database.T3", isSql)
-    assert(tableEnv.listTables().sameElements(Array[String]("T1")))
+    assertThat(tableEnv.listTables()).containsExactly("T1")
   }
 
   @ParameterizedTest
@@ -1770,96 +1756,96 @@ class TableEnvironmentTest {
   def testDropViewWithPartialPath(isSql: Boolean): Unit = {
     createViewsForDropTests()
 
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T2", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T2", "T3")
 
     dropView("T2", isSql)
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T3")
 
     dropView("T3", isSql)
-    assert(tableEnv.listTables().sameElements(Array[String]("T1")))
+    assertThat(tableEnv.listTables()).containsExactly("T1")
   }
 
   @Test
   def testDropViewIfExistsTwice(): Unit = {
     createViewsForDropTests()
 
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T2", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T2", "T3")
 
     tableEnv.executeSql("DROP VIEW IF EXISTS 
default_catalog.default_database.T2")
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T3")
 
     tableEnv.executeSql("DROP VIEW IF EXISTS 
default_catalog.default_database.T2")
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T3")
   }
 
   @Test
   def testDropViewTwice(): Unit = {
     createViewsForDropTests()
 
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T2", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T2", "T3")
 
     tableEnv.executeSql("DROP VIEW default_catalog.default_database.T2")
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T3")
 
-    assertThatThrownBy(() => tableEnv.executeSql("DROP VIEW 
default_catalog.default_database.T2"))
-      .isInstanceOf(classOf[ValidationException])
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("DROP VIEW 
default_catalog.default_database.T2"))
   }
 
   @Test
   def testDropView(): Unit = {
     createViewsForDropTests()
 
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T2", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T2", "T3")
 
-    assertTrue(tableEnv.dropView("default_catalog.default_database.T2"))
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T3")))
+    assertThat(tableEnv.dropView("default_catalog.default_database.T2")).isTrue
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T3")
 
-    assertFalse(tableEnv.dropView("default_catalog.default_database.T2"))
-    assertFalse(tableEnv.dropView("invalid.default_database.T2"))
-    assertFalse(tableEnv.dropView("default_catalog.invalid.T2"))
-    assertFalse(tableEnv.dropView("default_catalog.default_database.invalid"))
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T3")))
+    
assertThat(tableEnv.dropView("default_catalog.default_database.T2")).isFalse
+    assertThat(tableEnv.dropView("invalid.default_database.T2")).isFalse
+    assertThat(tableEnv.dropView("default_catalog.invalid.T2")).isFalse
+    
assertThat(tableEnv.dropView("default_catalog.default_database.invalid")).isFalse
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T3")
 
     tableEnv.createTemporaryView("T3", tableEnv.sqlQuery("SELECT 123"))
 
-    assertThatThrownBy(() => tableEnv.dropView("T3"))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.dropView("T3"))
+      .withMessageContaining(
         "Temporary view with identifier 
'`default_catalog`.`default_database`.`T3`' exists. " +
           "Drop it first before removing the permanent view.")
-      .isInstanceOf[ValidationException]
 
     tableEnv.dropTemporaryView("T3")
 
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T3")
     // Now can drop permanent view
     tableEnv.dropView("T3")
-    assert(tableEnv.listTables().sameElements(Array[String]("T1")))
+    assertThat(tableEnv.listTables()).containsExactly("T1")
   }
 
   @Test
   def testDropTable(): Unit = {
     createTableForTests()
 
-    assert(tableEnv.listTables().sameElements(Array[String]("T1")))
+    assertThat(tableEnv.listTables()).containsExactly("T1")
 
-    assertFalse(tableEnv.dropTable("default_catalog.default_database.T2"))
-    assert(tableEnv.listTables().sameElements(Array[String]("T1")))
+    
assertThat(tableEnv.dropTable("default_catalog.default_database.T2")).isFalse
+    assertThat(tableEnv.listTables()).containsExactly("T1")
 
-    assertThatThrownBy(() => 
tableEnv.dropTable("default_catalog.default_database.T2", false))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => 
tableEnv.dropTable("default_catalog.default_database.T2", false))
+      .withMessageContaining(
         "Table with identifier 'default_catalog.default_database.T2' does not 
exist.")
-      .isInstanceOf[ValidationException]
 
-    assertFalse(tableEnv.dropTable("default_catalog.default_database.T2"))
-    assertFalse(tableEnv.dropTable("invalid.default_database.T2"))
-    assertFalse(tableEnv.dropTable("default_catalog.invalid.T2"))
-    assertFalse(tableEnv.dropTable("default_catalog.default_database.invalid"))
-    assert(tableEnv.listTables().sameElements(Array[String]("T1")))
+    
assertThat(tableEnv.dropTable("default_catalog.default_database.T2")).isFalse
+    assertThat(tableEnv.dropTable("invalid.default_database.T2")).isFalse
+    assertThat(tableEnv.dropTable("default_catalog.invalid.T2")).isFalse
+    
assertThat(tableEnv.dropTable("default_catalog.default_database.invalid")).isFalse
+    assertThat(tableEnv.listTables()).containsExactly("T1")
 
-    assertTrue(tableEnv.dropTable("default_catalog.default_database.T1"))
-    assert(tableEnv.listTables().sameElements(Array[String]()))
+    
assertThat(tableEnv.dropTable("default_catalog.default_database.T1")).isTrue
+    assertThat(tableEnv.listTables()).isEmpty()
     createTableForTests()
-    assert(tableEnv.listTables().sameElements(Array[String]("T1")))
+    assertThat(tableEnv.listTables()).containsExactly("T1")
 
     tableEnv.createTemporaryTable(
       "T1",
@@ -1867,35 +1853,35 @@ class TableEnvironmentTest {
         .forConnector("values")
         .schema(Schema.newBuilder().column("col1", DataTypes.INT()).build())
         .build())
-    assertThatThrownBy(() => tableEnv.dropTable("T1"))
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.dropTable("T1"))
+      .withMessageContaining(
         "Temporary table with identifier 
'`default_catalog`.`default_database`.`T1`' exists. " +
           "Drop it first before removing the permanent table.")
-      .isInstanceOf[ValidationException]
 
     tableEnv.dropTemporaryTable("T1")
 
-    assert(tableEnv.listTables().sameElements(Array[String]("T1")))
+    assertThat(tableEnv.listTables()).containsExactly("T1")
     // Now can drop permanent table
     tableEnv.dropTable("T1")
-    assert(tableEnv.listTables().sameElements(Array[String]()))
+    assertThat(tableEnv.listTables()).isEmpty()
   }
 
   @Test
   def testDropViewWithInvalidPath(): Unit = {
     createViewsForDropTests()
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T2", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T2", "T3")
     // failed since 'default_catalog1.default_database1.T2' is invalid path
-    assertThatThrownBy(() => tableEnv.executeSql("DROP VIEW 
default_catalog1.default_database1.T2"))
-      .isInstanceOf[ValidationException]
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("DROP VIEW 
default_catalog1.default_database1.T2"))
   }
 
   @Test
   def testDropViewWithInvalidPathIfExists(): Unit = {
     createViewsForDropTests()
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T2", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T2", "T3")
     tableEnv.executeSql("DROP VIEW IF EXISTS 
default_catalog1.default_database1.T2")
-    assert(tableEnv.listTables().sameElements(Array[String]("T1", "T2", "T3")))
+    assertThat(tableEnv.listTables()).containsExactly("T1", "T2", "T3")
   }
 
   @Test
@@ -1919,13 +1905,13 @@ class TableEnvironmentTest {
     tableEnv.executeSql(sourceDDL)
     tableEnv.executeSql(viewDDL)
 
-    assert(tableEnv.listTemporaryViews().sameElements(Array[String]("T2")))
+    assertThat(tableEnv.listTemporaryViews()).containsExactly("T2")
 
     tableEnv.executeSql("DROP TEMPORARY VIEW IF EXISTS 
default_catalog.default_database.T2")
-    assert(tableEnv.listTemporaryViews().sameElements(Array[String]()))
+    assertThat(tableEnv.listTemporaryViews()).isEmpty()
 
     tableEnv.executeSql("DROP TEMPORARY VIEW IF EXISTS 
default_catalog.default_database.T2")
-    assert(tableEnv.listTemporaryViews().sameElements(Array[String]()))
+    assertThat(tableEnv.listTemporaryViews()).isEmpty()
   }
 
   @Test
@@ -1951,15 +1937,15 @@ class TableEnvironmentTest {
     tableEnv.executeSql(sourceDDL)
     tableEnv.executeSql(viewDDL)
 
-    assert(tableEnv.listTemporaryViews().sameElements(Array[String]("T2")))
+    assertThat(tableEnv.listTemporaryViews()).containsExactly("T2")
 
     tableEnv.executeSql("DROP TEMPORARY VIEW 
default_catalog.default_database.T2")
-    assert(tableEnv.listTemporaryViews().sameElements(Array[String]()))
+    assertThat(tableEnv.listTemporaryViews()).isEmpty()
 
     // throws ValidationException since default_catalog.default_database.T2 is 
not exists
-    assertThatThrownBy(
-      () => tableEnv.executeSql("DROP TEMPORARY VIEW 
default_catalog.default_database.T2"))
-      .isInstanceOf[ValidationException]
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () => tableEnv.executeSql("DROP TEMPORARY VIEW 
default_catalog.default_database.T2"))
   }
 
   @Test
@@ -1967,21 +1953,20 @@ class TableEnvironmentTest {
     createTableForTests()
 
     val tableResult2 = tableEnv.executeSql("CREATE VIEW view1 AS SELECT * FROM 
T1")
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val tableResult3 = tableEnv.executeSql("SHOW VIEWS")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("view name", DataTypes.STRING())),
-      tableResult3.getResolvedSchema)
+    
assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("view name", 
DataTypes.STRING())))
+      .isEqualTo(tableResult3.getResolvedSchema)
     checkData(util.Arrays.asList(Row.of("view1")).iterator(), 
tableResult3.collect())
 
     val tableResult4 = tableEnv.executeSql("CREATE TEMPORARY VIEW view2 AS 
SELECT * FROM T1")
-    assertEquals(ResultKind.SUCCESS, tableResult4.getResultKind)
+    assertThatObject(tableResult4.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     // SHOW VIEWS also shows temporary views
     val tableResult5 = tableEnv.executeSql("SHOW VIEWS")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult5.getResultKind)
+    
assertThatObject(tableResult5.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(
       util.Arrays.asList(Row.of("view1"), Row.of("view2")).iterator(),
       tableResult5.collect())
@@ -2001,7 +1986,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     checkExplain(
       "explain plan for select * from MyTable where a > 10",
@@ -2022,7 +2007,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt1)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createTableStmt2 =
       """
@@ -2035,7 +2020,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult2 = tableEnv.executeSql(createTableStmt2)
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     checkExplain(
       "explain plan for insert into MySink select a, b from MyTable where a > 
10",
@@ -2060,7 +2045,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = batchTableEnv.executeSql(createTableStmt1)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createTableStmt2 =
       """
@@ -2076,7 +2061,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult2 = batchTableEnv.executeSql(createTableStmt2)
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     checkExplain(
       "EXPLAIN PLAN FOR INSERT INTO MySink PARTITION (f2 = '123') SELECT f0, 
f1 FROM MyTable",
@@ -2105,7 +2090,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     // TODO we can support them later
     testUnsupportedExplain("explain plan excluding attributes for select * 
from MyTable")
@@ -2117,7 +2102,8 @@ class TableEnvironmentTest {
   }
 
   private def testUnsupportedExplain(explain: String): Unit = {
-    assertThatThrownBy(() => tableEnv.executeSql(explain))
+    assertThatExceptionOfType(classOf[SqlParserException])
+      .isThrownBy(() => tableEnv.executeSql(explain))
       .satisfiesAnyOf(
         anyCauseMatches(classOf[TableException], "Only default behavior is 
supported now"),
         anyCauseMatches(classOf[SqlParserException])
@@ -2138,12 +2124,12 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val actual =
       tableEnv.explainSql("select * from MyTable where a > 10", 
ExplainDetail.CHANGELOG_MODE)
     val expected = 
TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
-    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertThat(replaceStageId(actual)).isEqualTo(replaceStageId(expected))
   }
 
   @Test
@@ -2160,13 +2146,13 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val actual = tableEnv.explainSql(
       "execute select * from MyTable where a > 10",
       ExplainDetail.CHANGELOG_MODE)
     val expected = 
TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
-    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertThat(replaceStageId(actual)).isEqualTo(replaceStageId(expected))
   }
 
   @Test
@@ -2183,7 +2169,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt1)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createTableStmt2 =
       """
@@ -2196,11 +2182,11 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult2 = tableEnv.executeSql(createTableStmt2)
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val actual = tableEnv.explainSql("insert into MySink select a, b from 
MyTable where a > 10")
     val expected = 
TableTestUtil.readFromResource("/explain/testExplainSqlWithInsert.out")
-    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertThat(replaceStageId(actual)).isEqualTo(replaceStageId(expected))
   }
 
   @Test
@@ -2217,7 +2203,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     checkExplain(
       "explain changelog_mode, estimated_cost, json_execution_plan " +
@@ -2240,7 +2226,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createTableStmt2 =
       """
@@ -2254,7 +2240,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult3 = tableEnv.executeSql(createTableStmt2)
-    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
+    assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     checkExplain(
       "explain changelog_mode, estimated_cost, json_execution_plan " +
@@ -2277,7 +2263,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult1 = tableEnv.executeSql(createTableStmt1)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createTableStmt2 =
       """
@@ -2290,7 +2276,7 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult2 = tableEnv.executeSql(createTableStmt2)
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     checkExplain(
       "explain changelog_mode, estimated_cost, json_execution_plan " +
@@ -2405,10 +2391,10 @@ class TableEnvironmentTest {
         "`ts` - INTERVAL '1' SECOND")
     )
     val tableResult1 = tableEnv.executeSql("describe T1")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind)
+    
assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), tableResult1.collect())
     val tableResult2 = tableEnv.executeSql("desc T1")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    
assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), tableResult2.collect())
 
     val expectedResult2 = util.Arrays.asList(
@@ -2417,10 +2403,10 @@ class TableEnvironmentTest {
       Row.of("f", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false), 
null, null, null)
     )
     val tableResult3 = tableEnv.executeSql("describe T2")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+    
assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult2.iterator(), tableResult3.collect())
     val tableResult4 = tableEnv.executeSql("desc T2")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult4.getResultKind)
+    
assertThatObject(tableResult4.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult2.iterator(), tableResult4.collect())
 
     // temporary view T2(x, y) masks permanent view T2(d, e, f)
@@ -2434,10 +2420,10 @@ class TableEnvironmentTest {
       Row.of("x", "INT", Boolean.box(false), null, null, null),
       Row.of("y", "STRING", Boolean.box(false), null, null, null));
     val tableResult5 = tableEnv.executeSql("describe T2")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult5.getResultKind)
+    
assertThatObject(tableResult5.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult3.iterator(), tableResult5.collect())
     val tableResult6 = tableEnv.executeSql("desc T2")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult6.getResultKind)
+    
assertThatObject(tableResult6.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult3.iterator(), tableResult6.collect())
   }
 
@@ -2559,10 +2545,10 @@ class TableEnvironmentTest {
         "notice: watermark")
     )
     val tableResult1 = tableEnv.executeSql("describe T1")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind)
+    
assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), tableResult1.collect())
     val tableResult2 = tableEnv.executeSql("desc T1")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    
assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), tableResult2.collect())
   }
 
@@ -2591,17 +2577,15 @@ class TableEnvironmentTest {
         |""".stripMargin
     tableEnv.executeSql(alterDDL)
 
-    assertEquals(
-      Map(
-        "provider" -> "openai",
-        "task" -> "embedding",
-        "openai.endpoint" -> "openai-endpoint").asJava,
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
         .getModel(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.M1"))
-        .getOptions
-    )
+        .getOptions)
+      .containsExactlyInAnyOrderEntriesOf(
+        util.Map.of("provider", "openai", "task", "embedding", 
"openai.endpoint", "openai-endpoint")
+      )
   }
 
   @Test
@@ -2619,12 +2603,12 @@ class TableEnvironmentTest {
       """.stripMargin
     tableEnv.executeSql(sourceDDL)
 
-    assertThatThrownBy(
-      () =>
-        tableEnv
-          .executeSql("ALTER MODEL M1 SET ()"))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessageContaining("ALTER MODEL SET does not support empty option.");
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () =>
+          tableEnv
+            .executeSql("ALTER MODEL M1 SET ()"))
+      .withMessageContaining("ALTER MODEL SET does not support empty option.");
   }
 
   @Test
@@ -2638,9 +2622,9 @@ class TableEnvironmentTest {
         |  'task' = 'clustering'
         |)
         |""".stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(alterDDL))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessageContaining("Model `default_catalog`.`default_database`.`M1` 
doesn't exist.")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(alterDDL))
+      .withMessageContaining("Model `default_catalog`.`default_database`.`M1` 
doesn't exist.")
   }
 
   @Test
@@ -2685,9 +2669,9 @@ class TableEnvironmentTest {
       """
         |ALTER MODEL M1 RENAME TO M2
         |""".stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(alterDDL))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessageContaining("Model `default_catalog`.`default_database`.`M1` 
doesn't exist.")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(alterDDL))
+      .withMessageContaining("Model `default_catalog`.`default_database`.`M1` 
doesn't exist.")
   }
 
   @Test
@@ -2709,10 +2693,11 @@ class TableEnvironmentTest {
       """
         |ALTER MODEL `default_catalog`.`default_database`.`M1` RENAME TO 
`other_catalog`.`default_database`.`M2`
         |""".stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(alterDDL))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessageContaining(
-        "The catalog name of the new model name 
'other_catalog.default_database.M2' must be the same as the old model name 
'default_catalog.default_database.M1'.")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(alterDDL))
+      .withMessageContaining(
+        "The catalog name of the new model name 
'other_catalog.default_database.M2' " +
+          "must be the same as the old model name 
'default_catalog.default_database.M1'.")
   }
 
   @Test
@@ -2741,21 +2726,21 @@ class TableEnvironmentTest {
 
     tableEnv.executeSql("ALTER MODEL M1 RESET ('task')");
 
-    assertEquals(
-      Map("provider" -> "openai", "openai.endpoint" -> "some-endpoint").asJava,
+    assertThat(
       tableEnv
         .getCatalog(tableEnv.getCurrentCatalog)
         .get()
         .getModel(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.M1"))
         .getOptions
-    )
+    ).containsExactlyInAnyOrderEntriesOf(
+      util.Map.of("provider", "openai", "openai.endpoint", "some-endpoint"))
   }
 
   @Test
   def testAlterModelResetNonExist(): Unit = {
-    assertThatThrownBy(() => tableEnv.executeSql("ALTER MODEL M1 RESET 
('task')"))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessageContaining("Model `default_catalog`.`default_database`.`M1` 
doesn't exist.")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("ALTER MODEL M1 RESET ('task')"))
+      .withMessageContaining("Model `default_catalog`.`default_database`.`M1` 
doesn't exist.")
   }
 
   @Test
@@ -2778,12 +2763,12 @@ class TableEnvironmentTest {
       """.stripMargin
     tableEnv.executeSql(sourceDDL)
 
-    assertThatThrownBy(
-      () =>
-        tableEnv
-          .executeSql("ALTER MODEL M1 RESET ()"))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessageContaining("ALTER MODEL RESET does not support empty key.");
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () =>
+          tableEnv
+            .executeSql("ALTER MODEL M1 RESET ()"))
+      .withMessageContaining("ALTER MODEL RESET does not support empty key.");
   }
 
   @Test
@@ -2798,10 +2783,10 @@ class TableEnvironmentTest {
         |  'openai.endpoint' = 'some-endpoint'
         |)
       """.stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(sourceDDL))
-      .isInstanceOf(classOf[SqlValidateException])
-      .hasMessageContaining("Input column list can not be empty with non-empty 
output column list.")
-
+    assertThatExceptionOfType(classOf[SqlValidateException])
+      .isThrownBy(() => tableEnv.executeSql(sourceDDL))
+      .withMessageContaining(
+        "Input column list can not be empty with non-empty output column 
list.")
   }
 
   @Test
@@ -2817,10 +2802,9 @@ class TableEnvironmentTest {
         |  'openai.endpoint' = 'some-endpoint'
         |)
       """.stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(sourceDDL))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessageContaining("Duplicate input column name: 'f1'.")
-
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(sourceDDL))
+      .withMessageContaining("Duplicate input column name: 'f1'.")
   }
 
   @Test
@@ -2836,10 +2820,9 @@ class TableEnvironmentTest {
         |  'openai.endpoint' = 'some-endpoint'
         |)
       """.stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(sourceDDL))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessageContaining("Duplicate output column name: 'f2'.")
-
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(sourceDDL))
+      .withMessageContaining("Duplicate output column name: 'f2'.")
   }
 
   @Test
@@ -2854,9 +2837,9 @@ class TableEnvironmentTest {
         |  'openai.endpoint' = 'some-endpoint'
         |)
       """.stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(sourceDDL))
-      .isInstanceOf(classOf[SqlValidateException])
-      .hasMessageContaining("")
+    assertThatExceptionOfType(classOf[SqlValidateException])
+      .isThrownBy(() => tableEnv.executeSql(sourceDDL))
+      .withMessageContaining("")
   }
 
   @Test
@@ -2868,9 +2851,9 @@ class TableEnvironmentTest {
         |  OUTPUT(f2 string)
         |with ()
       """.stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(sourceDDL))
-      .isInstanceOf(classOf[SqlValidateException])
-      .hasMessageContaining("Model property list can not be empty.")
+    assertThatExceptionOfType(classOf[SqlValidateException])
+      .isThrownBy(() => tableEnv.executeSql(sourceDDL))
+      .withMessageContaining("Model property list can not be empty.")
   }
 
   @Test
@@ -2895,10 +2878,10 @@ class TableEnvironmentTest {
       Row.of("f2", "STRING", Boolean.box(true), Boolean.box(false), null)
     )
     val modelResult1 = tableEnv.executeSql("describe MODEL M1")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, modelResult1.getResultKind)
+    
assertThatObject(modelResult1.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), modelResult1.collect())
     val modelResult2 = tableEnv.executeSql("desc model M1")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, modelResult2.getResultKind)
+    
assertThatObject(modelResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), modelResult2.collect())
   }
 
@@ -2924,10 +2907,10 @@ class TableEnvironmentTest {
       Row.of("f2", "STRING", Boolean.box(true), Boolean.box(false))
     )
     val modelResult1 = tableEnv.executeSql("describe MODEL M1")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, modelResult1.getResultKind)
+    
assertThatObject(modelResult1.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), modelResult1.collect())
     val modelResult2 = tableEnv.executeSql("desc model M1")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, modelResult2.getResultKind)
+    
assertThatObject(modelResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), modelResult2.collect())
   }
 
@@ -2948,10 +2931,10 @@ class TableEnvironmentTest {
 
     val expectedResult1 = new util.ArrayList[Row]()
     val modelResult1 = tableEnv.executeSql("describe model M1")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, modelResult1.getResultKind)
+    
assertThatObject(modelResult1.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), modelResult1.collect())
     val modelResult2 = tableEnv.executeSql("desc model M1")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, modelResult2.getResultKind)
+    
assertThatObject(modelResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     checkData(expectedResult1.iterator(), modelResult2.collect())
   }
 
@@ -2984,7 +2967,7 @@ class TableEnvironmentTest {
          |)
          |""".stripMargin
     val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
-    assertEquals(expectedDDL, row.getField(0))
+    assertThat(row.getField(0)).isEqualTo(expectedDDL)
   }
 
   @Test
@@ -3025,7 +3008,7 @@ class TableEnvironmentTest {
          |)
          |""".stripMargin
     val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
-    assertEquals(expectedDDL, row.getField(0))
+    assertThat(row.getField(0)).isEqualTo(expectedDDL)
   }
 
   @Test
@@ -3057,14 +3040,14 @@ class TableEnvironmentTest {
          |)
          |""".stripMargin
     val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
-    assertEquals(expectedDDL, row.getField(0))
+    assertThat(row.getField(0)).isEqualTo(expectedDDL)
   }
 
   @Test
   def testShowCreateNonExistModel(): Unit = {
-    assertThatThrownBy(() => tableEnv.executeSql("SHOW CREATE MODEL M1"))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessage(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql("SHOW CREATE MODEL M1"))
+      .withMessage(
         "Could not execute SHOW CREATE MODEL. Model with identifier 
`default_catalog`.`default_database`.`M1` does not exist.")
   }
 
@@ -3093,7 +3076,7 @@ class TableEnvironmentTest {
          |)
          |""".stripMargin
     val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
-    assertEquals(expectedDDL, row.getField(0))
+    assertThat(row.getField(0)).isEqualTo(expectedDDL)
   }
 
   @Test
@@ -3123,7 +3106,7 @@ class TableEnvironmentTest {
          |)
          |""".stripMargin
     val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
-    assertEquals(expectedDDL, row.getField(0))
+    assertThat(row.getField(0)).isEqualTo(expectedDDL)
   }
 
   @Test
@@ -3152,9 +3135,9 @@ class TableEnvironmentTest {
       """
         |ALTER MODEL M1 RENAME TO M2
         |""".stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(alterDDL))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessageContaining("Model `default_catalog`.`default_database`.`M1` 
doesn't exist.")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(alterDDL))
+      .withMessageContaining("Model `default_catalog`.`default_database`.`M1` 
doesn't exist.")
   }
 
   @Test
@@ -3163,9 +3146,9 @@ class TableEnvironmentTest {
       """
         |DROP MODEL M1
         |""".stripMargin
-    assertThatThrownBy(() => tableEnv.executeSql(dropDDL))
-      .isInstanceOf(classOf[ValidationException])
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.executeSql(dropDDL))
+      .withMessageContaining(
         "Model with identifier 'default_catalog.default_database.M1' does not 
exist.")
   }
 
@@ -3193,23 +3176,22 @@ class TableEnvironmentTest {
         |""".stripMargin
     }
     val tableResult1 = tableEnv.executeSql(createModelStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val tableResult2 = tableEnv.executeSql("SHOW MODELS")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("model name", DataTypes.STRING())),
-      tableResult2.getResolvedSchema)
+    
assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("model name", 
DataTypes.STRING())))
+      .isEqualTo(tableResult2.getResolvedSchema)
     checkData(util.Arrays.asList(Row.of("M1")).iterator(), 
tableResult2.collect())
   }
 
   def testExecuteSqlWithEnhancedShowModels(): Unit = {
     val createCatalogResult =
       tableEnv.executeSql("CREATE CATALOG catalog1 
WITH('type'='generic_in_memory')")
-    assertEquals(ResultKind.SUCCESS, createCatalogResult.getResultKind)
+    
assertThatObject(createCatalogResult.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createDbResult = tableEnv.executeSql("CREATE database catalog1.db1")
-    assertEquals(ResultKind.SUCCESS, createDbResult.getResultKind)
+    assertThatObject(createDbResult.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createModelStmt =
       """
@@ -3223,7 +3205,7 @@ class TableEnvironmentTest {
         |)
         |""".stripMargin
     val tableResult1 = tableEnv.executeSql(createModelStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    assertThatObject(tableResult1.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val createTableStmt2 =
       """
@@ -3238,21 +3220,20 @@ class TableEnvironmentTest {
         |""".stripMargin
 
     val tableResult2 = tableEnv.executeSql(createTableStmt2)
-    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+    assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val tableResult3 = tableEnv.executeSql("SHOW MODELS FROM catalog1.db1 like 
'you_mo%'")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("model name", DataTypes.STRING())),
-      tableResult3.getResolvedSchema)
+    
assertThatObject(tableResult3.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("model name", 
DataTypes.STRING())))
+      .isEqualTo(tableResult3.getResolvedSchema)
     checkData(util.Arrays.asList(Row.of("your_model")).iterator(), 
tableResult3.collect())
   }
 
   @Test
   def testGetNonExistModel(): Unit = {
-    assertThatThrownBy(() => tableEnv.fromModel("MyModel"))
-      .hasMessageContaining("Model `MyModel` was not found")
-      .isInstanceOf[ValidationException]
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => tableEnv.fromModel("MyModel"))
+      .withMessageContaining("Model `MyModel` was not found")
   }
 
   @Test
@@ -3277,69 +3258,68 @@ class TableEnvironmentTest {
     tableEnv.registerCatalog(listener.getName, listener)
     // test temporary table
     tableEnv.executeSql("create temporary table tbl1 (x int)")
-    assertEquals(0, listener.numTempTable)
+    assertThat(listener.numTempTable).isZero
     tableEnv.executeSql(s"create temporary table 
${listener.getName}.`default`.tbl1 (x int)")
-    assertEquals(1, listener.numTempTable)
+    assertThat(listener.numTempTable).isOne
     val tableResult = tableEnv
       .asInstanceOf[TableEnvironmentInternal]
       .getCatalogManager
       .getTable(ObjectIdentifier.of(listener.getName, "default", "tbl1"))
-    assertTrue(tableResult.isPresent)
-    assertEquals(listener.tableComment, 
tableResult.get().getTable[CatalogBaseTable].getComment)
+    assertThat(tableResult).isPresent
+    assertThat(tableResult.get().getTable[CatalogBaseTable].getComment)
+      .isEqualTo(listener.tableComment)
     tableEnv.executeSql("drop temporary table tbl1")
-    assertEquals(1, listener.numTempTable)
+    assertThat(listener.numTempTable).isOne
     tableEnv.executeSql(s"drop temporary table 
${listener.getName}.`default`.tbl1")
-    assertEquals(0, listener.numTempTable)
+    assertThat(listener.numTempTable).isZero
     tableEnv.useCatalog(listener.getName)
     tableEnv.executeSql("create temporary table tbl1 (x int)")
-    assertEquals(1, listener.numTempTable)
+    assertThat(listener.numTempTable).isOne
     tableEnv.executeSql("drop temporary table tbl1")
-    assertEquals(0, listener.numTempTable)
+    assertThat(listener.numTempTable).isZero
     tableEnv.useCatalog(currentCat)
 
     // test temporary view
     tableEnv.executeSql("create temporary view v1 as select 1")
-    assertEquals(0, listener.numTempTable)
+    assertThat(listener.numTempTable).isZero
     tableEnv.executeSql(s"create temporary view 
${listener.getName}.`default`.v1 as select 1")
-    assertEquals(1, listener.numTempTable)
+    assertThat(listener.numTempTable).isOne
     val viewResult = tableEnv
       .asInstanceOf[TableEnvironmentInternal]
       .getCatalogManager
       .getTable(ObjectIdentifier.of(listener.getName, "default", "v1"))
-    assertTrue(viewResult.isPresent)
-    assertEquals(listener.tableComment, 
viewResult.get().getTable[CatalogBaseTable].getComment)
+    assertThat(viewResult).isPresent
+    assertThat(viewResult.get().getTable[CatalogBaseTable].getComment)
+      .isEqualTo(listener.tableComment)
     tableEnv.executeSql("drop temporary view v1")
-    assertEquals(1, listener.numTempTable)
+    assertThat(listener.numTempTable).isOne
     tableEnv.executeSql(s"drop temporary view 
${listener.getName}.`default`.v1")
-    assertEquals(0, listener.numTempTable)
+    assertThat(listener.numTempTable).isZero
     tableEnv.useCatalog(listener.getName)
     tableEnv.executeSql("create temporary view v1 as select 1")
-    assertEquals(1, listener.numTempTable)
+    assertThat(listener.numTempTable).isOne
     tableEnv.executeSql("drop temporary view  v1")
-    assertEquals(0, listener.numTempTable)
+    assertThat(listener.numTempTable).isZero
     tableEnv.useCatalog(currentCat)
 
     // test temporary function
     val clzName = "foo.class.name"
-    try {
-      tableEnv.executeSql(s"create temporary function func1 as '$clzName'")
-      fail("Creating a temporary function with invalid class should fail")
-    } catch {
-      case _: Exception => // expected
-    }
-    assertEquals(0, listener.numTempFunc)
+    assertThatExceptionOfType(classOf[Exception])
+      .isThrownBy(() => tableEnv.executeSql(s"create temporary function func1 
as '$clzName'"))
+
+    assertThat(listener.numTempFunc).isZero
     tableEnv.executeSql(
       s"create temporary function ${listener.getName}.`default`.func1 as 
'$clzName'")
-    assertEquals(1, listener.numTempFunc)
+    assertThat(listener.numTempFunc).isOne
     tableEnv.executeSql("drop temporary function if exists func1")
-    assertEquals(1, listener.numTempFunc)
+    assertThat(listener.numTempFunc).isOne
     tableEnv.executeSql(s"drop temporary function 
${listener.getName}.`default`.func1")
-    assertEquals(0, listener.numTempFunc)
+    assertThat(listener.numTempFunc).isZero
     tableEnv.useCatalog(listener.getName)
     tableEnv.executeSql(s"create temporary function func1 as '$clzName'")
-    assertEquals(1, listener.numTempFunc)
+    assertThat(listener.numTempFunc).isOne
     tableEnv.executeSql("drop temporary function func1")
-    assertEquals(0, listener.numTempFunc)
+    assertThat(listener.numTempFunc).isZero
     tableEnv.useCatalog(currentCat)
 
     listener.close()
@@ -3359,9 +3339,9 @@ class TableEnvironmentTest {
 
     tableEnv.getConfig.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH)
 
-    assertThatThrownBy(() => tableEnv.explainSql("select * from MyTable"))
-      .isInstanceOf(classOf[IllegalArgumentException])
-      .hasMessageContaining(
+    assertThatExceptionOfType(classOf[IllegalArgumentException])
+      .isThrownBy(() => tableEnv.explainSql("select * from MyTable"))
+      .withMessageContaining(
         "Mismatch between configured runtime mode and actual runtime mode. " +
           "Currently, the 'execution.runtime-mode' can only be set when 
instantiating the " +
           "table environment. Subsequent changes are not supported. " +
@@ -3387,7 +3367,7 @@ class TableEnvironmentTest {
     var tableResult = tableEnv.executeSql(
       "alter table tbl add partition " +
         "(b=1000,c='2020-05-01') partition (b=2000,c='2020-01-01') with 
('k'='v')")
-    assertEquals(ResultKind.SUCCESS, tableResult.getResultKind)
+    assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     val spec1 = new CatalogPartitionSpec(Map("b" -> "1000", "c" -> 
"2020-05-01").asJava)
     val spec2 = new CatalogPartitionSpec(Map("b" -> "2000", "c" -> 
"2020-01-01").asJava)
@@ -3397,24 +3377,24 @@ class TableEnvironmentTest {
     val tablePath = new ObjectPath("default_database", "tbl")
     val actual = catalog.listPartitions(tablePath)
     // assert partition spec
-    assertEquals(List(spec1, spec2).asJava, actual)
+    assertThatList(actual).containsExactly(spec1, spec2)
 
     val part1 = catalog.getPartition(tablePath, spec1)
     val part2 = catalog.getPartition(tablePath, spec2)
     // assert partition properties
-    assertEquals(Collections.emptyMap(), part1.getProperties)
-    assertEquals(Collections.singletonMap("k", "v"), part2.getProperties)
+    assertThat(part1.getProperties).isEmpty()
+    assertThat(part2.getProperties).isEqualTo(util.Map.of("k", "v"))
 
     // add existed partition with if not exists
     tableResult =
       tableEnv.executeSql("alter table tbl add if not exists partition 
(b=1000,c='2020-05-01')")
-    assertEquals(ResultKind.SUCCESS, tableResult.getResultKind)
+    assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS)
 
     // add existed partition without if not exists
-    assertThatThrownBy(
-      () => tableEnv.executeSql("alter table tbl add partition 
(b=1000,c='2020-05-01')"))
-      .isInstanceOf(classOf[TableException])
-      .hasMessageContaining("Could not execute ALTER TABLE 
default_catalog.default_database.tbl" +
+    assertThatExceptionOfType(classOf[TableException])
+      .isThrownBy(
+        () => tableEnv.executeSql("alter table tbl add partition 
(b=1000,c='2020-05-01')"))
+      .withMessageContaining("Could not execute ALTER TABLE 
default_catalog.default_database.tbl" +
         " ADD PARTITION (b=1000, c=2020-05-01)")
 
   }
@@ -3521,7 +3501,7 @@ class TableEnvironmentTest {
 
     for (viewSql <- viewDdls) {
       val view = tableEnv.executeSql(viewSql)
-      assertEquals(ResultKind.SUCCESS, view.getResultKind)
+      assertThatObject(view.getResultKind).isSameAs(ResultKind.SUCCESS)
     }
   }
 
@@ -3538,30 +3518,28 @@ class TableEnvironmentTest {
         |)
       """.stripMargin
     val tableResult = tableEnv.executeSql(createTableStmt)
-    assertEquals(ResultKind.SUCCESS, tableResult.getResultKind)
+    assertThatObject(tableResult.getResultKind).isSameAs(ResultKind.SUCCESS)
   }
 
   private def checkData(expected: util.Iterator[Row], actual: 
util.Iterator[Row]): Unit = {
     while (expected.hasNext && actual.hasNext) {
-      assertEquals(expected.next(), actual.next())
+      assertThat(actual.next()).isEqualTo(expected.next())
     }
-    assertEquals(expected.hasNext, actual.hasNext)
+    assertThat(actual.hasNext).isEqualTo(expected.hasNext)
   }
 
   private def validateShowModules(expectedEntries: (String, 
java.lang.Boolean)*): Unit = {
     val showModules = tableEnv.executeSql("SHOW MODULES")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, showModules.getResultKind)
-    assertEquals(
-      ResolvedSchema.of(Column.physical("module name", DataTypes.STRING())),
-      showModules.getResolvedSchema)
+    
assertThatObject(showModules.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(ResolvedSchema.of(Column.physical("module name", 
DataTypes.STRING())))
+      .isEqualTo(showModules.getResolvedSchema)
 
     val showFullModules = tableEnv.executeSql("SHOW FULL MODULES")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, 
showFullModules.getResultKind)
-    assertEquals(
+    
assertThatObject(showFullModules.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
+    assertThat(showFullModules.getResolvedSchema).isEqualTo(
       ResolvedSchema.physical(
         Array[String]("module name", "used"),
-        Array[DataType](DataTypes.STRING(), DataTypes.BOOLEAN())),
-      showFullModules.getResolvedSchema
+        Array[DataType](DataTypes.STRING(), DataTypes.BOOLEAN()))
     )
 
     // show modules only list used modules
@@ -3578,16 +3556,15 @@ class TableEnvironmentTest {
   private def checkListModules(expected: String*): Unit = {
     val actual = tableEnv.listModules()
     for ((module, i) <- expected.zipWithIndex) {
-      assertEquals(module, actual.apply(i))
+      assertThat(actual.apply(i)).isEqualTo(module)
     }
   }
 
   private def checkListFullModules(expected: (String, java.lang.Boolean)*): 
Unit = {
     val actual = tableEnv.listFullModules()
     for ((elem, i) <- expected.zipWithIndex) {
-      assertEquals(
-        new ModuleEntry(elem._1, elem._2).asInstanceOf[Object],
-        actual.apply(i).asInstanceOf[Object])
+      assertThat(new ModuleEntry(elem._1, elem._2).asInstanceOf[Object])
+        .isEqualTo(actual.apply(i).asInstanceOf[Object])
     }
   }
 
@@ -3603,8 +3580,8 @@ class TableEnvironmentTest {
         new Configuration(),
         false)
     val source = TableFactoryUtil.findAndCreateTableSource(context)
-    assertTrue(source.isInstanceOf[CollectionTableSource])
-    assertEquals(expectToBeBounded, 
source.asInstanceOf[CollectionTableSource].isBounded)
+    assertThat(source).isInstanceOf(classOf[CollectionTableSource])
+    
assertThat(source.asInstanceOf[CollectionTableSource].isBounded).isEqualTo(expectToBeBounded)
   }
 
   private def checkExplain(sql: String, resultPath: String, streaming: Boolean 
= true): Unit = {
@@ -3613,16 +3590,16 @@ class TableEnvironmentTest {
     } else {
       batchTableEnv.executeSql(sql)
     }
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    
assertThatObject(tableResult2.getResultKind).isSameAs(ResultKind.SUCCESS_WITH_CONTENT)
     val it = tableResult2.collect()
-    assertTrue(it.hasNext)
+    assertThat(it).hasNext
     val row = it.next()
-    assertEquals(1, row.getArity)
+    assertThat(row.getArity).isOne
     val actual = 
replaceNodeIdInOperator(replaceStreamNodeId(row.getField(0).toString.trim))
     val expected = replaceNodeIdInOperator(
       replaceStreamNodeId(TableTestUtil.readFromResource(resultPath).trim))
-    assertEquals(replaceStageId(expected), replaceStageId(actual))
-    assertFalse(it.hasNext)
+    assertThat(replaceStageId(expected)).isEqualTo(replaceStageId(actual))
+    assertThat(it.hasNext).isFalse
   }
 
   class ListenerCatalog(name: String)

Reply via email to