This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 850cd5c3bc Flink: Migrate subclasses of FlinkCatalogTestBase to JUnit5 
(#9381)
850cd5c3bc is described below

commit 850cd5c3bcaac86b392d8ab9fa586f355f6e2d76
Author: vinitpatni <[email protected]>
AuthorDate: Sat Jan 13 19:34:45 2024 +0530

    Flink: Migrate subclasses of FlinkCatalogTestBase to JUnit5 (#9381)
---
 .../apache/iceberg/flink/FlinkCatalogTestBase.java | 155 -------
 .../java/org/apache/iceberg/flink/TestBase.java    |   2 +-
 .../iceberg/flink/TestFlinkCatalogTable.java       | 492 ++++++++++-----------
 .../flink/TestFlinkCatalogTablePartitions.java     |   2 +-
 .../apache/iceberg/flink/TestFlinkHiveCatalog.java |   8 +-
 .../apache/iceberg/flink/TestFlinkTableSink.java   | 132 +++---
 .../org/apache/iceberg/flink/TestFlinkUpsert.java  |  56 +--
 .../apache/iceberg/flink/TestIcebergConnector.java |   6 +-
 .../flink/actions/TestRewriteDataFilesAction.java  | 146 +++---
 .../flink/source/TestFlinkMetaDataTable.java       | 454 +++++++++----------
 .../iceberg/flink/source/TestStreamScanSql.java    |  50 +--
 11 files changed, 618 insertions(+), 885 deletions(-)

diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
deleted file mode 100644
index 74c5d343e9..0000000000
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.flink;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.util.ArrayUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.SupportsNamespaces;
-import org.apache.iceberg.hadoop.HadoopCatalog;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public abstract class FlinkCatalogTestBase extends FlinkTestBase {
-
-  protected static final String DATABASE = "db";
-  private static TemporaryFolder hiveWarehouse = new TemporaryFolder();
-  private static TemporaryFolder hadoopWarehouse = new TemporaryFolder();
-
-  @BeforeClass
-  public static void createWarehouse() throws IOException {
-    hiveWarehouse.create();
-    hadoopWarehouse.create();
-  }
-
-  @AfterClass
-  public static void dropWarehouse() {
-    hiveWarehouse.delete();
-    hadoopWarehouse.delete();
-  }
-
-  @Before
-  public void before() {
-    sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config));
-  }
-
-  @After
-  public void clean() {
-    dropCatalog(catalogName, true);
-  }
-
-  @Parameterized.Parameters(name = "catalogName = {0} baseNamespace = {1}")
-  public static Iterable<Object[]> parameters() {
-    return Lists.newArrayList(
-        new Object[] {"testhive", Namespace.empty()},
-        new Object[] {"testhadoop", Namespace.empty()},
-        new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")});
-  }
-
-  protected final String catalogName;
-  protected final Namespace baseNamespace;
-  protected final Catalog validationCatalog;
-  protected final SupportsNamespaces validationNamespaceCatalog;
-  protected final Map<String, String> config = Maps.newHashMap();
-
-  protected final String flinkDatabase;
-  protected final Namespace icebergNamespace;
-  protected final boolean isHadoopCatalog;
-
-  public FlinkCatalogTestBase(String catalogName, Namespace baseNamespace) {
-    this.catalogName = catalogName;
-    this.baseNamespace = baseNamespace;
-    this.isHadoopCatalog = catalogName.startsWith("testhadoop");
-    this.validationCatalog =
-        isHadoopCatalog
-            ? new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getRoot())
-            : catalog;
-    this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
-
-    config.put("type", "iceberg");
-    if (!baseNamespace.isEmpty()) {
-      config.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString());
-    }
-    if (isHadoopCatalog) {
-      config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
-    } else {
-      config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
-      config.put(CatalogProperties.URI, getURI(hiveConf));
-    }
-    config.put(CatalogProperties.WAREHOUSE_LOCATION, 
String.format("file://%s", warehouseRoot()));
-
-    this.flinkDatabase = catalogName + "." + DATABASE;
-    this.icebergNamespace =
-        Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[] 
{DATABASE}));
-  }
-
-  protected String warehouseRoot() {
-    if (isHadoopCatalog) {
-      return hadoopWarehouse.getRoot().getAbsolutePath();
-    } else {
-      return hiveWarehouse.getRoot().getAbsolutePath();
-    }
-  }
-
-  protected String getFullQualifiedTableName(String tableName) {
-    final List<String> levels = Lists.newArrayList(icebergNamespace.levels());
-    levels.add(tableName);
-    return Joiner.on('.').join(levels);
-  }
-
-  static String getURI(HiveConf conf) {
-    return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
-  }
-
-  static String toWithClause(Map<String, String> props) {
-    StringBuilder builder = new StringBuilder();
-    builder.append("(");
-    int propCount = 0;
-    for (Map.Entry<String, String> entry : props.entrySet()) {
-      if (propCount > 0) {
-        builder.append(",");
-      }
-      builder
-          .append("'")
-          .append(entry.getKey())
-          .append("'")
-          .append("=")
-          .append("'")
-          .append(entry.getValue())
-          .append("'");
-      propCount++;
-    }
-    builder.append(")");
-    return builder.toString();
-  }
-}
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
index 4fc0207f26..3986f1a796 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -45,7 +45,7 @@ public abstract class TestBase extends TestBaseUtils {
   public static MiniClusterExtension miniClusterResource =
       MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
 
-  @TempDir Path temporaryDirectory;
+  @TempDir protected Path temporaryDirectory;
 
   private static TestHiveMetastore metastore = null;
   protected static HiveConf hiveConf = null;
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 8f5ddde918..ef0802d869 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -19,6 +19,8 @@
 package org.apache.iceberg.flink;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -46,30 +48,21 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
 
-public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
-
-  public TestFlinkCatalogTable(String catalogName, Namespace baseNamespace) {
-    super(catalogName, baseNamespace);
-  }
+public class TestFlinkCatalogTable extends CatalogTestBase {
 
   @Override
-  @Before
+  @BeforeEach
   public void before() {
     super.before();
     sql("CREATE DATABASE %s", flinkDatabase);
@@ -77,7 +70,7 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
     sql("USE %s", DATABASE);
   }
 
-  @After
+  @AfterEach
   public void cleanNamespaces() {
     sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
     sql("DROP TABLE IF EXISTS %s.tl2", flinkDatabase);
@@ -85,7 +78,7 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
     super.clean();
   }
 
-  @Test
+  @TestTemplate
   public void testGetTable() {
     sql("CREATE TABLE tl(id BIGINT, strV STRING)");
 
@@ -94,84 +87,73 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
         new Schema(
             Types.NestedField.optional(1, "id", Types.LongType.get()),
             Types.NestedField.optional(2, "strV", Types.StringType.get()));
-    Assert.assertEquals(
-        "Should load the expected iceberg schema", iSchema.toString(), 
table.schema().toString());
+    assertThat(table.schema().toString())
+        .as("Should load the expected iceberg schema")
+        .isEqualTo(iSchema.toString());
   }
 
-  @Test
+  @TestTemplate
   public void testRenameTable() {
-    Assume.assumeFalse("HadoopCatalog does not support rename table", 
isHadoopCatalog);
-
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support rename 
table").isFalse();
     final Schema tableSchema =
         new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()));
     validationCatalog.createTable(TableIdentifier.of(icebergNamespace, "tl"), 
tableSchema);
     sql("ALTER TABLE tl RENAME TO tl2");
 
-    Assertions.assertThatThrownBy(() -> getTableEnv().from("tl"))
+    assertThatThrownBy(() -> getTableEnv().from("tl"))
         .isInstanceOf(ValidationException.class)
         .hasMessage("Table `tl` was not found.");
 
     Schema actualSchema = 
FlinkSchemaUtil.convert(getTableEnv().from("tl2").getSchema());
-    Assert.assertEquals(tableSchema.asStruct(), actualSchema.asStruct());
+    assertThat(tableSchema.asStruct()).isEqualTo(actualSchema.asStruct());
   }
 
-  @Test
+  @TestTemplate
   public void testCreateTable() throws TableNotExistException {
     sql("CREATE TABLE tl(id BIGINT)");
 
     Table table = table("tl");
-    Assert.assertEquals(
-        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
-        table.schema().asStruct());
-
+    assertThat(table.schema().asStruct())
+        .isEqualTo(
+            new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct());
     CatalogTable catalogTable = catalogTable("tl");
-    Assert.assertEquals(
-        TableSchema.builder().field("id", DataTypes.BIGINT()).build(), 
catalogTable.getSchema());
+    assertThat(catalogTable.getSchema())
+        .isEqualTo(TableSchema.builder().field("id", 
DataTypes.BIGINT()).build());
   }
 
-  @Test
+  @TestTemplate
   public void testCreateTableWithPrimaryKey() throws Exception {
     sql("CREATE TABLE tl(id BIGINT, data STRING, key STRING PRIMARY KEY NOT 
ENFORCED)");
 
     Table table = table("tl");
-    Assert.assertEquals(
-        "Should have the expected row key.",
-        Sets.newHashSet(table.schema().findField("key").fieldId()),
-        table.schema().identifierFieldIds());
-
+    assertThat(table.schema().identifierFieldIds())
+        .as("Should have the expected row key.")
+        .isEqualTo(Sets.newHashSet(table.schema().findField("key").fieldId()));
     CatalogTable catalogTable = catalogTable("tl");
     Optional<UniqueConstraint> uniqueConstraintOptional = 
catalogTable.getSchema().getPrimaryKey();
-    Assert.assertTrue(
-        "Should have the expected unique constraint", 
uniqueConstraintOptional.isPresent());
-    Assert.assertEquals(
-        "Should have the expected columns",
-        ImmutableList.of("key"),
-        uniqueConstraintOptional.get().getColumns());
+    assertThat(uniqueConstraintOptional).isPresent();
+    
assertThat(uniqueConstraintOptional.get().getColumns()).containsExactly("key");
   }
 
-  @Test
+  @TestTemplate
   public void testCreateTableWithMultiColumnsInPrimaryKey() throws Exception {
     sql(
         "CREATE TABLE tl(id BIGINT, data STRING, CONSTRAINT pk_constraint 
PRIMARY KEY(data, id) NOT ENFORCED)");
 
     Table table = table("tl");
-    Assert.assertEquals(
-        "Should have the expected RowKey",
-        Sets.newHashSet(
-            table.schema().findField("id").fieldId(), 
table.schema().findField("data").fieldId()),
-        table.schema().identifierFieldIds());
-
+    assertThat(table.schema().identifierFieldIds())
+        .as("Should have the expected RowKey")
+        .isEqualTo(
+            Sets.newHashSet(
+                table.schema().findField("id").fieldId(),
+                table.schema().findField("data").fieldId()));
     CatalogTable catalogTable = catalogTable("tl");
     Optional<UniqueConstraint> uniqueConstraintOptional = 
catalogTable.getSchema().getPrimaryKey();
-    Assert.assertTrue(
-        "Should have the expected unique constraint", 
uniqueConstraintOptional.isPresent());
-    Assert.assertEquals(
-        "Should have the expected columns",
-        ImmutableSet.of("data", "id"),
-        ImmutableSet.copyOf(uniqueConstraintOptional.get().getColumns()));
+    assertThat(uniqueConstraintOptional).isPresent();
+    
assertThat(uniqueConstraintOptional.get().getColumns()).containsExactly("id", 
"data");
   }
 
-  @Test
+  @TestTemplate
   public void testCreateTableIfNotExists() {
     sql("CREATE TABLE tl(id BIGINT)");
 
@@ -193,97 +175,96 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
     assertThat(table("tl").properties()).containsEntry("key", "value");
   }
 
-  @Test
+  @TestTemplate
   public void testCreateTableLike() throws TableNotExistException {
     sql("CREATE TABLE tl(id BIGINT)");
     sql("CREATE TABLE tl2 LIKE tl");
 
     Table table = table("tl2");
-    Assert.assertEquals(
-        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
-        table.schema().asStruct());
-
+    assertThat(table.schema().asStruct())
+        .isEqualTo(
+            new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct());
     CatalogTable catalogTable = catalogTable("tl2");
-    Assert.assertEquals(
-        TableSchema.builder().field("id", DataTypes.BIGINT()).build(), 
catalogTable.getSchema());
+    assertThat(catalogTable.getSchema())
+        .isEqualTo(TableSchema.builder().field("id", 
DataTypes.BIGINT()).build());
   }
 
-  @Test
+  @TestTemplate
   public void testCreateTableLocation() {
-    Assume.assumeFalse(
-        "HadoopCatalog does not support creating table with location", 
isHadoopCatalog);
-
+    assumeThat(isHadoopCatalog)
+        .as("HadoopCatalog does not support creating table with location")
+        .isFalse();
     sql("CREATE TABLE tl(id BIGINT) WITH ('location'='file:///tmp/location')");
 
     Table table = table("tl");
-    Assert.assertEquals(
-        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
-        table.schema().asStruct());
-    Assert.assertEquals("file:///tmp/location", table.location());
+    assertThat(table.schema().asStruct())
+        .isEqualTo(
+            new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct());
+    assertThat(table.location()).isEqualTo("file:///tmp/location");
   }
 
-  @Test
+  @TestTemplate
   public void testCreatePartitionTable() throws TableNotExistException {
     sql("CREATE TABLE tl(id BIGINT, dt STRING) PARTITIONED BY(dt)");
 
     Table table = table("tl");
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()))
-            .asStruct(),
-        table.schema().asStruct());
-    Assert.assertEquals(
-        PartitionSpec.builderFor(table.schema()).identity("dt").build(), 
table.spec());
-
+    assertThat(table.schema().asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()))
+                .asStruct());
+    assertThat(table.spec())
+        
.isEqualTo(PartitionSpec.builderFor(table.schema()).identity("dt").build());
     CatalogTable catalogTable = catalogTable("tl");
-    Assert.assertEquals(
-        TableSchema.builder()
-            .field("id", DataTypes.BIGINT())
-            .field("dt", DataTypes.STRING())
-            .build(),
-        catalogTable.getSchema());
-    Assert.assertEquals(Collections.singletonList("dt"), 
catalogTable.getPartitionKeys());
+    assertThat(catalogTable.getSchema())
+        .isEqualTo(
+            TableSchema.builder()
+                .field("id", DataTypes.BIGINT())
+                .field("dt", DataTypes.STRING())
+                .build());
+    
assertThat(catalogTable.getPartitionKeys()).isEqualTo(Collections.singletonList("dt"));
   }
 
-  @Test
+  @TestTemplate
   public void testCreateTableWithFormatV2ThroughTableProperty() throws 
Exception {
     sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')");
 
     Table table = table("tl");
-    Assert.assertEquals(
-        "should create table using format v2",
-        2,
-        ((BaseTable) table).operations().current().formatVersion());
+    assertThat(((BaseTable) 
table).operations().current().formatVersion()).isEqualTo(2);
   }
 
-  @Test
+  @TestTemplate
   public void testUpgradeTableWithFormatV2ThroughTableProperty() throws 
Exception {
     sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='1')");
 
     Table table = table("tl");
     TableOperations ops = ((BaseTable) table).operations();
-    Assert.assertEquals("should create table using format v1", 1, 
ops.refresh().formatVersion());
-
+    assertThat(ops.refresh().formatVersion())
+        .as("should create table using format v1")
+        .isEqualTo(1);
     sql("ALTER TABLE tl SET('format-version'='2')");
-    Assert.assertEquals("should update table to use format v2", 2, 
ops.refresh().formatVersion());
+    assertThat(ops.refresh().formatVersion())
+        .as("should update table to use format v2")
+        .isEqualTo(2);
   }
 
-  @Test
+  @TestTemplate
   public void testDowngradeTableToFormatV1ThroughTablePropertyFails() throws 
Exception {
     sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')");
 
     Table table = table("tl");
     TableOperations ops = ((BaseTable) table).operations();
-    Assert.assertEquals("should create table using format v2", 2, 
ops.refresh().formatVersion());
-
+    assertThat(ops.refresh().formatVersion())
+        .as("should create table using format v2")
+        .isEqualTo(2);
     Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl 
SET('format-version'='1')"))
         .rootCause()
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Cannot downgrade v2 table to v1");
   }
 
-  @Test
+  @TestTemplate
   public void testLoadTransformPartitionTable() throws TableNotExistException {
     Schema schema = new Schema(Types.NestedField.optional(0, "id", 
Types.LongType.get()));
     validationCatalog.createTable(
@@ -292,12 +273,12 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
         PartitionSpec.builderFor(schema).bucket("id", 100).build());
 
     CatalogTable catalogTable = catalogTable("tl");
-    Assert.assertEquals(
-        TableSchema.builder().field("id", DataTypes.BIGINT()).build(), 
catalogTable.getSchema());
-    Assert.assertEquals(Collections.emptyList(), 
catalogTable.getPartitionKeys());
+    assertThat(catalogTable.getSchema())
+        .isEqualTo(TableSchema.builder().field("id", 
DataTypes.BIGINT()).build());
+    assertThat(catalogTable.getPartitionKeys()).isEmpty();
   }
 
-  @Test
+  @TestTemplate
   public void testAlterTableProperties() throws TableNotExistException {
     sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')");
     Map<String, String> properties = Maps.newHashMap();
@@ -319,35 +300,32 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
     assertThat(table("tl").properties()).containsAllEntriesOf(properties);
   }
 
-  @Test
+  @TestTemplate
   public void testAlterTableAddColumn() {
     sql("CREATE TABLE tl(id BIGINT)");
     Schema schemaBefore = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
-        schemaBefore.asStruct());
-
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct());
     sql("ALTER TABLE tl ADD (dt STRING)");
     Schema schemaAfter1 = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()))
-            .asStruct(),
-        schemaAfter1.asStruct());
-
+    assertThat(schemaAfter1.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()))
+                .asStruct());
     // Add multiple columns
     sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)");
     Schema schemaAfter2 = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()),
-                Types.NestedField.optional(3, "col1", Types.StringType.get()),
-                Types.NestedField.optional(4, "col2", Types.LongType.get()))
-            .asStruct(),
-        schemaAfter2.asStruct());
-
+    assertThat(schemaAfter2.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()),
+                    Types.NestedField.optional(3, "col1", 
Types.StringType.get()),
+                    Types.NestedField.optional(4, "col2", 
Types.LongType.get()))
+                .asStruct());
     // Adding a required field should fail because Iceberg's SchemaUpdate does 
not allow
     // incompatible changes.
     Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT 
NULL)"))
@@ -360,36 +338,33 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
         .hasMessageContaining("Try to add a column `id` which already exists 
in the table.");
   }
 
-  @Test
+  @TestTemplate
   public void testAlterTableDropColumn() {
     sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)");
     Schema schemaBefore = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()),
-                Types.NestedField.optional(3, "col1", Types.StringType.get()),
-                Types.NestedField.optional(4, "col2", Types.LongType.get()))
-            .asStruct(),
-        schemaBefore.asStruct());
-
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()),
+                    Types.NestedField.optional(3, "col1", 
Types.StringType.get()),
+                    Types.NestedField.optional(4, "col2", 
Types.LongType.get()))
+                .asStruct());
     sql("ALTER TABLE tl DROP (dt)");
     Schema schemaAfter1 = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(3, "col1", Types.StringType.get()),
-                Types.NestedField.optional(4, "col2", Types.LongType.get()))
-            .asStruct(),
-        schemaAfter1.asStruct());
-
+    assertThat(schemaAfter1.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(3, "col1", 
Types.StringType.get()),
+                    Types.NestedField.optional(4, "col2", 
Types.LongType.get()))
+                .asStruct());
     // Drop multiple columns
     sql("ALTER TABLE tl DROP (col1, col2)");
     Schema schemaAfter2 = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
-        schemaAfter2.asStruct());
-
+    assertThat(schemaAfter2.asStruct())
+        .isEqualTo(
+            new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct());
     // Dropping an non-existing field should fail due to Flink's internal 
validation.
     Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)"))
         .isInstanceOf(ValidationException.class)
@@ -401,48 +376,45 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
         .hasMessageContaining("The column `dt` does not exist in the base 
table.");
   }
 
-  @Test
+  @TestTemplate
   public void testAlterTableModifyColumnName() {
     sql("CREATE TABLE tl(id BIGINT, dt STRING)");
     Schema schemaBefore = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()))
-            .asStruct(),
-        schemaBefore.asStruct());
-
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()))
+                .asStruct());
     sql("ALTER TABLE tl RENAME dt TO data");
     Schema schemaAfter = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "data", Types.StringType.get()))
-            .asStruct(),
-        schemaAfter.asStruct());
+    assertThat(schemaAfter.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "data", 
Types.StringType.get()))
+                .asStruct());
   }
 
-  @Test
+  @TestTemplate
   public void testAlterTableModifyColumnType() {
     sql("CREATE TABLE tl(id INTEGER, dt STRING)");
     Schema schemaBefore = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.IntegerType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()))
-            .asStruct(),
-        schemaBefore.asStruct());
-
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", 
Types.IntegerType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()))
+                .asStruct());
     // Promote type from Integer to Long
     sql("ALTER TABLE tl MODIFY (id BIGINT)");
     Schema schemaAfter = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()))
-            .asStruct(),
-        schemaAfter.asStruct());
-
+    assertThat(schemaAfter.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()))
+                .asStruct());
     // Type change that doesn't follow the type-promotion rule should fail due 
to Iceberg's
     // validation.
     Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt 
INTEGER)"))
@@ -451,17 +423,16 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
         .hasRootCauseMessage("Cannot change column type: dt: string -> int");
   }
 
-  @Test
+  @TestTemplate
   public void testAlterTableModifyColumnNullability() {
     sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)");
     Schema schemaBefore = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.required(1, "id", Types.IntegerType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()))
-            .asStruct(),
-        schemaBefore.asStruct());
-
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.required(1, "id", 
Types.IntegerType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()))
+                .asStruct());
     // Changing nullability from optional to required should fail
     // because Iceberg's SchemaUpdate does not allow incompatible changes.
     Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING 
NOT NULL)"))
@@ -472,43 +443,42 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
     // Set nullability from required to optional
     sql("ALTER TABLE tl MODIFY (id INTEGER)");
     Schema schemaAfter = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.IntegerType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()))
-            .asStruct(),
-        schemaAfter.asStruct());
+    assertThat(schemaAfter.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", 
Types.IntegerType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()))
+                .asStruct());
   }
 
-  @Test
+  @TestTemplate
   public void testAlterTableModifyColumnPosition() {
     sql("CREATE TABLE tl(id BIGINT, dt STRING)");
     Schema schemaBefore = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()))
-            .asStruct(),
-        schemaBefore.asStruct());
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()))
+                .asStruct());
 
     sql("ALTER TABLE tl MODIFY (dt STRING FIRST)");
     Schema schemaAfter = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(2, "dt", Types.StringType.get()),
-                Types.NestedField.optional(1, "id", Types.LongType.get()))
-            .asStruct(),
-        schemaAfter.asStruct());
+    assertThat(schemaAfter.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()),
+                    Types.NestedField.optional(1, "id", Types.LongType.get()))
+                .asStruct());
 
     sql("ALTER TABLE tl MODIFY (dt STRING AFTER id)");
     Schema schemaAfterAfter = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()))
-            .asStruct(),
-        schemaAfterAfter.asStruct());
-
+    assertThat(schemaAfterAfter.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()))
+                .asStruct());
     // Modifying the position of a non-existing column should fail due to 
Flink's internal
     // validation.
     Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY 
(non_existing STRING FIRST)"))
@@ -523,67 +493,64 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
             "Referenced column `non_existing` by 'AFTER' does not exist in the 
table.");
   }
 
-  @Test
+  @TestTemplate
   public void testAlterTableModifyColumnComment() {
     sql("CREATE TABLE tl(id BIGINT, dt STRING)");
     Schema schemaBefore = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get()))
-            .asStruct(),
-        schemaBefore.asStruct());
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", 
Types.StringType.get()))
+                .asStruct());
 
     sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'comment for dt field')");
     Schema schemaAfter = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.optional(1, "id", Types.LongType.get()),
-                Types.NestedField.optional(2, "dt", Types.StringType.get(), 
"comment for dt field"))
-            .asStruct(),
-        schemaAfter.asStruct());
+    assertThat(schemaAfter.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(
+                        2, "dt", Types.StringType.get(), "comment for dt 
field"))
+                .asStruct());
   }
 
-  @Test
+  @TestTemplate
   public void testAlterTableConstraint() {
     sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL, col1 
STRING)");
     Schema schemaBefore = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.required(1, "id", Types.LongType.get()),
-                Types.NestedField.required(2, "dt", Types.StringType.get()),
-                Types.NestedField.optional(3, "col1", Types.StringType.get()))
-            .asStruct(),
-        schemaBefore.asStruct());
-    Assert.assertEquals(ImmutableSet.of(), 
schemaBefore.identifierFieldNames());
-
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.required(1, "id", Types.LongType.get()),
+                    Types.NestedField.required(2, "dt", 
Types.StringType.get()),
+                    Types.NestedField.optional(3, "col1", 
Types.StringType.get()))
+                .asStruct());
+    assertThat(schemaBefore.identifierFieldNames()).isEmpty();
     sql("ALTER TABLE tl ADD (PRIMARY KEY (id) NOT ENFORCED)");
     Schema schemaAfterAdd = table("tl").schema();
-    Assert.assertEquals(ImmutableSet.of("id"), 
schemaAfterAdd.identifierFieldNames());
-
+    assertThat(schemaAfterAdd.identifierFieldNames()).containsExactly("id");
     sql("ALTER TABLE tl MODIFY (PRIMARY KEY (dt) NOT ENFORCED)");
     Schema schemaAfterModify = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.required(1, "id", Types.LongType.get()),
-                Types.NestedField.required(2, "dt", Types.StringType.get()),
-                Types.NestedField.optional(3, "col1", Types.StringType.get()))
-            .asStruct(),
-        schemaAfterModify.asStruct());
-    Assert.assertEquals(ImmutableSet.of("dt"), 
schemaAfterModify.identifierFieldNames());
-
+    assertThat(schemaAfterModify.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.required(1, "id", Types.LongType.get()),
+                    Types.NestedField.required(2, "dt", 
Types.StringType.get()),
+                    Types.NestedField.optional(3, "col1", 
Types.StringType.get()))
+                .asStruct());
+    assertThat(schemaAfterModify.identifierFieldNames()).containsExactly("dt");
     // Composite primary key
     sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, dt) NOT ENFORCED)");
     Schema schemaAfterComposite = table("tl").schema();
-    Assert.assertEquals(
-        new Schema(
-                Types.NestedField.required(1, "id", Types.LongType.get()),
-                Types.NestedField.required(2, "dt", Types.StringType.get()),
-                Types.NestedField.optional(3, "col1", Types.StringType.get()))
-            .asStruct(),
-        schemaAfterComposite.asStruct());
-    Assert.assertEquals(ImmutableSet.of("id", "dt"), 
schemaAfterComposite.identifierFieldNames());
-
+    assertThat(schemaAfterComposite.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.required(1, "id", Types.LongType.get()),
+                    Types.NestedField.required(2, "dt", 
Types.StringType.get()),
+                    Types.NestedField.optional(3, "col1", 
Types.StringType.get()))
+                .asStruct());
+    
assertThat(schemaAfterComposite.identifierFieldNames()).containsExactlyInAnyOrder("id",
 "dt");
     // Setting an optional field as primary key should fail
     // because Iceberg's SchemaUpdate does not allow incompatible changes.
     Assertions.assertThatThrownBy(
@@ -607,16 +574,15 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
         .hasRootCauseMessage("Unsupported table change: DropConstraint.");
   }
 
-  @Test
+  @TestTemplate
   public void testRelocateTable() {
-    Assume.assumeFalse("HadoopCatalog does not support relocate table", 
isHadoopCatalog);
-
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support relocate 
table").isFalse();
     sql("CREATE TABLE tl(id BIGINT)");
     sql("ALTER TABLE tl SET('location'='file:///tmp/location')");
-    Assert.assertEquals("file:///tmp/location", table("tl").location());
+    assertThat(table("tl").location()).isEqualTo("file:///tmp/location");
   }
 
-  @Test
+  @TestTemplate
   public void testSetCurrentAndCherryPickSnapshotId() {
     sql("CREATE TABLE tl(c1 INT, c2 STRING, c3 STRING) PARTITIONED BY (c1)");
 
@@ -651,9 +617,9 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
     table.newReplacePartitions().addFile(replacementFile).stageOnly().commit();
 
     Snapshot staged = Iterables.getLast(table.snapshots());
-    Assert.assertEquals(
-        "Should find the staged overwrite snapshot", DataOperations.OVERWRITE, 
staged.operation());
-
+    assertThat(staged.operation())
+        .as("Should find the staged overwrite snapshot")
+        .isEqualTo(DataOperations.OVERWRITE);
     // add another append so that the original commit can't be fast-forwarded
     table.newAppend().appendFile(fileB).commit();
 
@@ -675,7 +641,7 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
             .map(FileScanTask::file)
             .map(ContentFile::path)
             .collect(Collectors.toSet());
-    Assert.assertEquals("Files should match", expectedFilePaths, 
actualFilePaths);
+    assertThat(actualFilePaths).as("Files should 
match").isEqualTo(expectedFilePaths);
   }
 
   private Table table(String name) {
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index 05fd1bad1d..b32be379ca 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -53,7 +53,7 @@ public class TestFlinkCatalogTablePartitions extends 
CatalogTestBase {
     for (FileFormat format :
         new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, 
FileFormat.PARQUET}) {
       for (Boolean cacheEnabled : new Boolean[] {true, false}) {
-        for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+        for (Object[] catalogParams : CatalogTestBase.parameters()) {
           String catalogName = (String) catalogParams[0];
           Namespace baseNamespace = (Namespace) catalogParams[1];
           parameters.add(new Object[] {catalogName, baseNamespace, format, 
cacheEnabled});
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
index 8f238587d3..47ee2afceb 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
@@ -42,7 +42,7 @@ public class TestFlinkHiveCatalog extends FlinkTestBase {
     Map<String, String> props = Maps.newHashMap();
     props.put("type", "iceberg");
     props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
-    props.put(CatalogProperties.URI, FlinkCatalogTestBase.getURI(hiveConf));
+    props.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf));
 
     File warehouseDir = tempFolder.newFolder();
     props.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + 
warehouseDir.getAbsolutePath());
@@ -69,7 +69,7 @@ public class TestFlinkHiveCatalog extends FlinkTestBase {
     Map<String, String> props = Maps.newHashMap();
     props.put("type", "iceberg");
     props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
-    props.put(CatalogProperties.URI, FlinkCatalogTestBase.getURI(hiveConf));
+    props.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf));
     // Set the 'hive-conf-dir' instead of 'warehouse'
     props.put(FlinkCatalogFactory.HIVE_CONF_DIR, 
hiveConfDir.getAbsolutePath());
 
@@ -78,9 +78,7 @@ public class TestFlinkHiveCatalog extends FlinkTestBase {
 
   private void checkSQLQuery(Map<String, String> catalogProperties, File 
warehouseDir)
       throws IOException {
-    sql(
-        "CREATE CATALOG test_catalog WITH %s",
-        FlinkCatalogTestBase.toWithClause(catalogProperties));
+    sql("CREATE CATALOG test_catalog WITH %s", 
CatalogTestBase.toWithClause(catalogProperties));
     sql("USE CATALOG test_catalog");
     sql("CREATE DATABASE test_db");
     sql("USE test_db");
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index 7540627989..b7fce104f4 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iceberg.flink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -32,11 +35,12 @@ import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Namespace;
@@ -45,42 +49,30 @@ import org.apache.iceberg.flink.source.BoundedTableFactory;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFlinkTableSink extends FlinkCatalogTestBase {
-
-  @ClassRule
-  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
-
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestFlinkTableSink extends CatalogTestBase {
 
   private static final String SOURCE_TABLE = 
"default_catalog.default_database.bounded_source";
   private static final String TABLE_NAME = "test_table";
   private TableEnvironment tEnv;
   private Table icebergTable;
 
-  private final FileFormat format;
-  private final boolean isStreamingJob;
+  @Parameter(index = 2)
+  private FileFormat format;
 
-  @Parameterized.Parameters(
-      name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
-  public static Iterable<Object[]> parameters() {
+  @Parameter(index = 3)
+  private boolean isStreamingJob;
+
+  @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, 
isStreaming={3}")
+  public static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     for (FileFormat format :
         new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, 
FileFormat.PARQUET}) {
       for (Boolean isStreaming : new Boolean[] {true, false}) {
-        for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+        for (Object[] catalogParams : CatalogTestBase.parameters()) {
           String catalogName = (String) catalogParams[0];
           Namespace baseNamespace = (Namespace) catalogParams[1];
           parameters.add(new Object[] {catalogName, baseNamespace, format, 
isStreaming});
@@ -90,13 +82,6 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase 
{
     return parameters;
   }
 
-  public TestFlinkTableSink(
-      String catalogName, Namespace baseNamespace, FileFormat format, Boolean 
isStreamingJob) {
-    super(catalogName, baseNamespace);
-    this.format = format;
-    this.isStreamingJob = isStreamingJob;
-  }
-
   @Override
   protected TableEnvironment getTableEnv() {
     if (tEnv == null) {
@@ -121,7 +106,7 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
   }
 
   @Override
-  @Before
+  @BeforeEach
   public void before() {
     super.before();
     sql("CREATE DATABASE %s", flinkDatabase);
@@ -134,7 +119,7 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
   }
 
   @Override
-  @After
+  @AfterEach
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
@@ -142,7 +127,7 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
     super.clean();
   }
 
-  @Test
+  @TestTemplate
   public void testInsertFromSourceTable() throws Exception {
     // Register the rows into a temporary table.
     getTableEnv()
@@ -169,10 +154,11 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
             SimpleDataUtil.createRecord(null, "bar")));
   }
 
-  @Test
+  @TestTemplate
   public void testOverwriteTable() throws Exception {
-    Assume.assumeFalse(
-        "Flink unbounded streaming does not support overwrite operation", 
isStreamingJob);
+    assumeThat(isStreamingJob)
+        .as("Flink unbounded streaming does not support overwrite operation")
+        .isFalse();
 
     sql("INSERT INTO %s SELECT 1, 'a'", TABLE_NAME);
     SimpleDataUtil.assertTableRecords(
@@ -183,7 +169,7 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
-  @Test
+  @TestTemplate
   public void testWriteParallelism() throws Exception {
     List<Row> dataSet =
         IntStream.range(1, 1000)
@@ -206,22 +192,21 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
     Transformation<?> committer = dummySink.getInputs().get(0);
     Transformation<?> writer = committer.getInputs().get(0);
 
-    Assert.assertEquals("Should have the expected 1 parallelism.", 1, 
writer.getParallelism());
-
+    assertThat(writer.getParallelism()).as("Should have the expected 1 
parallelism.").isEqualTo(1);
     writer
         .getInputs()
         .forEach(
             input ->
-                Assert.assertEquals(
-                    "Should have the expected parallelism.",
-                    isStreamingJob ? 2 : 4,
-                    input.getParallelism()));
+                assertThat(input.getParallelism())
+                    .as("Should have the expected parallelism.")
+                    .isEqualTo(isStreamingJob ? 2 : 4));
   }
 
-  @Test
+  @TestTemplate
   public void testReplacePartitions() throws Exception {
-    Assume.assumeFalse(
-        "Flink unbounded streaming does not support overwrite operation", 
isStreamingJob);
+    assumeThat(isStreamingJob)
+        .as("Flink unbounded streaming does not support overwrite operation")
+        .isFalse();
     String tableName = "test_partition";
     sql(
         "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH 
('write.format.default'='%s')",
@@ -265,7 +250,7 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
     }
   }
 
-  @Test
+  @TestTemplate
   public void testInsertIntoPartition() throws Exception {
     String tableName = "test_insert_into_partition";
     sql(
@@ -305,7 +290,7 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
     }
   }
 
-  @Test
+  @TestTemplate
   public void testHashDistributeMode() throws Exception {
     String tableName = "test_hash_distribution_mode";
     Map<String, String> tableProps =
@@ -326,10 +311,10 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
         "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
             + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
         SOURCE_TABLE, dataId);
-    Assert.assertEquals(
-        "Should have the expected rows in source table.",
-        Sets.newHashSet(dataSet),
-        Sets.newHashSet(sql("SELECT * FROM %s", SOURCE_TABLE)));
+
+    assertThat(sql("SELECT * FROM %s", SOURCE_TABLE))
+        .as("Should have the expected rows in source table.")
+        .containsExactlyInAnyOrderElementsOf(dataSet);
 
     sql(
         "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
@@ -339,10 +324,9 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
       // Insert data set.
       sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
 
-      Assert.assertEquals(
-          "Should have the expected rows in sink table.",
-          Sets.newHashSet(dataSet),
-          Sets.newHashSet(sql("SELECT * FROM %s", tableName)));
+      assertThat(sql("SELECT * FROM %s", tableName))
+          .as("Should have the expected rows in sink table.")
+          .containsExactlyInAnyOrderElementsOf(dataSet);
 
       // Sometimes we will have more than one checkpoint if we pass the auto 
checkpoint interval,
       // thus producing multiple snapshots.  Here we assert that each snapshot 
has only 1 file per
@@ -354,24 +338,18 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
           continue;
         }
 
-        Assert.assertEquals(
-            "There should be 1 data file in partition 'aaa'",
-            1,
-            SimpleDataUtil.matchingPartitions(
-                    dataFiles, table.spec(), ImmutableMap.of("data", "aaa"))
-                .size());
-        Assert.assertEquals(
-            "There should be 1 data file in partition 'bbb'",
-            1,
-            SimpleDataUtil.matchingPartitions(
-                    dataFiles, table.spec(), ImmutableMap.of("data", "bbb"))
-                .size());
-        Assert.assertEquals(
-            "There should be 1 data file in partition 'ccc'",
-            1,
-            SimpleDataUtil.matchingPartitions(
-                    dataFiles, table.spec(), ImmutableMap.of("data", "ccc"))
-                .size());
+        assertThat(
+                SimpleDataUtil.matchingPartitions(
+                    dataFiles, table.spec(), ImmutableMap.of("data", "aaa")))
+            .hasSize(1);
+        assertThat(
+                SimpleDataUtil.matchingPartitions(
+                    dataFiles, table.spec(), ImmutableMap.of("data", "bbb")))
+            .hasSize(1);
+        assertThat(
+                SimpleDataUtil.matchingPartitions(
+                    dataFiles, table.spec(), ImmutableMap.of("data", "ccc")))
+            .hasSize(1);
       }
     } finally {
       sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index a25ebab6c4..5674c83e40 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -25,47 +25,32 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
 
-@RunWith(Parameterized.class)
-public class TestFlinkUpsert extends FlinkCatalogTestBase {
+public class TestFlinkUpsert extends CatalogTestBase {
 
-  @ClassRule
-  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
+  @Parameter(index = 2)
+  private FileFormat format;
 
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+  @Parameter(index = 3)
+  private boolean isStreamingJob;
 
-  private final boolean isStreamingJob;
   private final Map<String, String> tableUpsertProps = Maps.newHashMap();
   private TableEnvironment tEnv;
 
-  public TestFlinkUpsert(
-      String catalogName, Namespace baseNamespace, FileFormat format, Boolean 
isStreamingJob) {
-    super(catalogName, baseNamespace);
-    this.isStreamingJob = isStreamingJob;
-    tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
-    tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
-    tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
-  }
-
-  @Parameterized.Parameters(
-      name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
-  public static Iterable<Object[]> parameters() {
+  @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, 
isStreaming={3}")
+  public static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     for (FileFormat format :
         new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, 
FileFormat.ORC}) {
@@ -105,22 +90,25 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
   }
 
   @Override
-  @Before
+  @BeforeEach
   public void before() {
     super.before();
     sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
     sql("USE %s", DATABASE);
+    tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+    tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
+    tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
   }
 
   @Override
-  @After
+  @AfterEach
   public void clean() {
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
     super.clean();
   }
 
-  @Test
+  @TestTemplate
   public void testUpsertAndQuery() {
     String tableName = "test_upsert_query";
     LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
@@ -164,7 +152,7 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     }
   }
 
-  @Test
+  @TestTemplate
   public void testUpsertOptions() {
     String tableName = "test_upsert_options";
     LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
@@ -210,7 +198,7 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     }
   }
 
-  @Test
+  @TestTemplate
   public void testPrimaryKeyEqualToPartitionKey() {
     // This is an SQL based reproduction of 
TestFlinkIcebergSinkV2#testUpsertOnDataKey
     String tableName = "upsert_on_id_key";
@@ -243,7 +231,7 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     }
   }
 
-  @Test
+  @TestTemplate
   public void testPrimaryKeyFieldsAtBeginningOfSchema() {
     String tableName = "upsert_on_pk_at_schema_start";
     LocalDate dt = LocalDate.of(2022, 3, 1);
@@ -292,7 +280,7 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
     }
   }
 
-  @Test
+  @TestTemplate
   public void testPrimaryKeyFieldsAtEndOfTableSchema() {
     // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, 
but the primary key
     // fields
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index 4f71b5fe8d..cb409b7843 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -280,7 +280,7 @@ public class TestIcebergConnector extends FlinkTestBase {
     catalogProps.put("type", "iceberg");
     if (isHiveCatalog()) {
       catalogProps.put("catalog-type", "hive");
-      catalogProps.put(CatalogProperties.URI, 
FlinkCatalogTestBase.getURI(hiveConf));
+      catalogProps.put(CatalogProperties.URI, 
CatalogTestBase.getURI(hiveConf));
     } else {
       catalogProps.put("catalog-type", "hadoop");
     }
@@ -315,7 +315,7 @@ public class TestIcebergConnector extends FlinkTestBase {
     tableProps.put("catalog-name", catalogName);
     tableProps.put(CatalogProperties.WAREHOUSE_LOCATION, createWarehouse());
     if (isHiveCatalog()) {
-      tableProps.put(CatalogProperties.URI, 
FlinkCatalogTestBase.getURI(hiveConf));
+      tableProps.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf));
     }
     return tableProps;
   }
@@ -337,7 +337,7 @@ public class TestIcebergConnector extends FlinkTestBase {
   }
 
   private String toWithClause(Map<String, String> props) {
-    return FlinkCatalogTestBase.toWithClause(props);
+    return CatalogTestBase.toWithClause(props);
   }
 
   private static String createWarehouse() {
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
index 07e5ca051d..4220775f41 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
@@ -19,9 +19,11 @@
 package org.apache.iceberg.flink.actions;
 
 import static org.apache.iceberg.flink.SimpleDataUtil.RECORD;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
@@ -39,6 +41,8 @@ import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.RewriteDataFilesActionResult;
@@ -49,7 +53,7 @@ import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.CatalogTestBase;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
@@ -59,44 +63,36 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
 import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestRewriteDataFilesAction extends CatalogTestBase {
 
   private static final String TABLE_NAME_UNPARTITIONED = 
"test_table_unpartitioned";
   private static final String TABLE_NAME_PARTITIONED = 
"test_table_partitioned";
   private static final String TABLE_NAME_WITH_PK = "test_table_with_pk";
-  private final FileFormat format;
+
+  @Parameter(index = 2)
+  private FileFormat format;
+
   private Table icebergTableUnPartitioned;
   private Table icebergTablePartitioned;
   private Table icebergTableWithPk;
 
-  public TestRewriteDataFilesAction(
-      String catalogName, Namespace baseNamespace, FileFormat format) {
-    super(catalogName, baseNamespace);
-    this.format = format;
-  }
-
   @Override
   protected TableEnvironment getTableEnv() {
     
super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM,
 1);
     return super.getTableEnv();
   }
 
-  @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, 
format={2}")
-  public static Iterable<Object[]> parameters() {
+  @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}")
+  public static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     for (FileFormat format :
         new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, 
FileFormat.PARQUET}) {
-      for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+      for (Object[] catalogParams : CatalogTestBase.parameters()) {
         String catalogName = (String) catalogParams[0];
         Namespace baseNamespace = (Namespace) catalogParams[1];
         parameters.add(new Object[] {catalogName, baseNamespace, format});
@@ -105,10 +101,10 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
     return parameters;
   }
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  private @TempDir Path temp;
 
   @Override
-  @Before
+  @BeforeEach
   public void before() {
     super.before();
     sql("CREATE DATABASE %s", flinkDatabase);
@@ -135,7 +131,7 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
   }
 
   @Override
-  @After
+  @AfterEach
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED);
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED);
@@ -144,14 +140,14 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
     super.clean();
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteDataFilesEmptyTable() throws Exception {
-    Assert.assertNull("Table must be empty", 
icebergTableUnPartitioned.currentSnapshot());
+    assertThat(icebergTableUnPartitioned.currentSnapshot()).isNull();
     Actions.forTable(icebergTableUnPartitioned).rewriteDataFiles().execute();
-    Assert.assertNull("Table must stay empty", 
icebergTableUnPartitioned.currentSnapshot());
+    assertThat(icebergTableUnPartitioned.currentSnapshot()).isNull();
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteDataFilesUnpartitionedTable() throws Exception {
     sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
     sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
@@ -161,21 +157,19 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
     CloseableIterable<FileScanTask> tasks = 
icebergTableUnPartitioned.newScan().planFiles();
     List<DataFile> dataFiles =
         Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
-    Assert.assertEquals("Should have 2 data files before rewrite", 2, 
dataFiles.size());
-
+    assertThat(dataFiles).hasSize(2);
     RewriteDataFilesActionResult result =
         
Actions.forTable(icebergTableUnPartitioned).rewriteDataFiles().execute();
 
-    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
-    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+    assertThat(result.deletedDataFiles()).hasSize(2);
+    assertThat(result.addedDataFiles()).hasSize(1);
 
     icebergTableUnPartitioned.refresh();
 
     CloseableIterable<FileScanTask> tasks1 = 
icebergTableUnPartitioned.newScan().planFiles();
     List<DataFile> dataFiles1 =
         Lists.newArrayList(CloseableIterable.transform(tasks1, 
FileScanTask::file));
-    Assert.assertEquals("Should have 1 data files after rewrite", 1, 
dataFiles1.size());
-
+    assertThat(dataFiles1).hasSize(1);
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(
         icebergTableUnPartitioned,
@@ -183,7 +177,7 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
             SimpleDataUtil.createRecord(1, "hello"), 
SimpleDataUtil.createRecord(2, "world")));
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteDataFilesPartitionedTable() throws Exception {
     sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
     sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
@@ -195,21 +189,19 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
     CloseableIterable<FileScanTask> tasks = 
icebergTablePartitioned.newScan().planFiles();
     List<DataFile> dataFiles =
         Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
-    Assert.assertEquals("Should have 4 data files before rewrite", 4, 
dataFiles.size());
-
+    assertThat(dataFiles).hasSize(4);
     RewriteDataFilesActionResult result =
         Actions.forTable(icebergTablePartitioned).rewriteDataFiles().execute();
 
-    Assert.assertEquals("Action should rewrite 4 data files", 4, 
result.deletedDataFiles().size());
-    Assert.assertEquals("Action should add 2 data file", 2, 
result.addedDataFiles().size());
+    assertThat(result.deletedDataFiles()).hasSize(4);
+    assertThat(result.addedDataFiles()).hasSize(2);
 
     icebergTablePartitioned.refresh();
 
     CloseableIterable<FileScanTask> tasks1 = 
icebergTablePartitioned.newScan().planFiles();
     List<DataFile> dataFiles1 =
         Lists.newArrayList(CloseableIterable.transform(tasks1, 
FileScanTask::file));
-    Assert.assertEquals("Should have 2 data files after rewrite", 2, 
dataFiles1.size());
-
+    assertThat(dataFiles1).hasSize(2);
     // Assert the table records as expected.
     Schema schema =
         new Schema(
@@ -227,7 +219,7 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
             record.copy("id", 4, "data", "world", "spec", "b")));
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteDataFilesWithFilter() throws Exception {
     sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
     sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
@@ -240,25 +232,22 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
     CloseableIterable<FileScanTask> tasks = 
icebergTablePartitioned.newScan().planFiles();
     List<DataFile> dataFiles =
         Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
-    Assert.assertEquals("Should have 5 data files before rewrite", 5, 
dataFiles.size());
-
+    assertThat(dataFiles).hasSize(5);
     RewriteDataFilesActionResult result =
         Actions.forTable(icebergTablePartitioned)
             .rewriteDataFiles()
             .filter(Expressions.equal("spec", "a"))
             .filter(Expressions.startsWith("data", "he"))
             .execute();
-
-    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
-    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+    assertThat(result.deletedDataFiles()).hasSize(2);
+    assertThat(result.addedDataFiles()).hasSize(1);
 
     icebergTablePartitioned.refresh();
 
     CloseableIterable<FileScanTask> tasks1 = 
icebergTablePartitioned.newScan().planFiles();
     List<DataFile> dataFiles1 =
         Lists.newArrayList(CloseableIterable.transform(tasks1, 
FileScanTask::file));
-    Assert.assertEquals("Should have 4 data files after rewrite", 4, 
dataFiles1.size());
-
+    assertThat(dataFiles1).hasSize(4);
     // Assert the table records as expected.
     Schema schema =
         new Schema(
@@ -277,7 +266,7 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
             record.copy("id", 5, "data", "world", "spec", "b")));
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteLargeTableHasResiduals() throws IOException {
     // all records belong to the same partition
     List<String> records1 = Lists.newArrayList();
@@ -309,19 +298,19 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
             .filter(Expressions.equal("data", "0"))
             .planFiles();
     for (FileScanTask task : tasks) {
-      Assert.assertEquals("Residuals must be ignored", 
Expressions.alwaysTrue(), task.residual());
+      assertThat(task.residual())
+          .as("Residuals must be ignored")
+          .isEqualTo(Expressions.alwaysTrue());
     }
     List<DataFile> dataFiles =
         Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
-    Assert.assertEquals("Should have 2 data files before rewrite", 2, 
dataFiles.size());
-
+    assertThat(dataFiles).hasSize(2);
     Actions actions = Actions.forTable(icebergTableUnPartitioned);
 
     RewriteDataFilesActionResult result =
         actions.rewriteDataFiles().filter(Expressions.equal("data", 
"0")).execute();
-    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
-    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
-
+    assertThat(result.deletedDataFiles()).hasSize(2);
+    assertThat(result.addedDataFiles()).hasSize(1);
     // Assert the table records as expected.
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
@@ -339,12 +328,12 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
    *
    * @throws IOException IOException
    */
-  @Test
+  @TestTemplate
   public void testRewriteAvoidRepeateCompress() throws IOException {
     List<Record> expected = Lists.newArrayList();
     Schema schema = icebergTableUnPartitioned.schema();
     GenericAppenderFactory genericAppenderFactory = new 
GenericAppenderFactory(schema);
-    File file = temp.newFile();
+    File file = File.createTempFile("junit", null, temp.toFile());
     int count = 0;
     try (FileAppender<Record> fileAppender =
         genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
@@ -374,8 +363,7 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
     CloseableIterable<FileScanTask> tasks = 
icebergTableUnPartitioned.newScan().planFiles();
     List<DataFile> dataFiles =
         Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
-    Assert.assertEquals("Should have 3 data files before rewrite", 3, 
dataFiles.size());
-
+    assertThat(dataFiles).hasSize(3);
     Actions actions = Actions.forTable(icebergTableUnPartitioned);
 
     long targetSizeInBytes = file.length() + 10;
@@ -385,20 +373,18 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
             .targetSizeInBytes(targetSizeInBytes)
             .splitOpenFileCost(1)
             .execute();
-    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
-    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
-
+    assertThat(result.deletedDataFiles()).hasSize(2);
+    assertThat(result.addedDataFiles()).hasSize(1);
     icebergTableUnPartitioned.refresh();
 
     CloseableIterable<FileScanTask> tasks1 = 
icebergTableUnPartitioned.newScan().planFiles();
     List<DataFile> dataFilesRewrote =
         Lists.newArrayList(CloseableIterable.transform(tasks1, 
FileScanTask::file));
-    Assert.assertEquals("Should have 2 data files after rewrite", 2, 
dataFilesRewrote.size());
-
+    assertThat(dataFilesRewrote).hasSize(2);
     // the biggest file do not be rewrote
     List rewroteDataFileNames =
         
dataFilesRewrote.stream().map(ContentFile::path).collect(Collectors.toList());
-    Assert.assertTrue(rewroteDataFileNames.contains(file.getAbsolutePath()));
+    assertThat(rewroteDataFileNames).contains(file.getAbsolutePath());
 
     // Assert the table records as expected.
     expected.add(SimpleDataUtil.createRecord(1, "a"));
@@ -406,7 +392,7 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
     // Add 2 data files
     sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
@@ -423,11 +409,9 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
     sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 
'hi'", TABLE_NAME_WITH_PK);
 
     icebergTableWithPk.refresh();
-    Assert.assertEquals(
-        "The latest sequence number should be greater than that of the stale 
snapshot",
-        stale1.currentSnapshot().sequenceNumber() + 1,
-        icebergTableWithPk.currentSnapshot().sequenceNumber());
-
+    assertThat(icebergTableWithPk.currentSnapshot().sequenceNumber())
+        .as("The latest sequence number should be greater than that of the 
stale snapshot")
+        .isEqualTo(stale1.currentSnapshot().sequenceNumber() + 1);
     CloseableIterable<FileScanTask> tasks = 
icebergTableWithPk.newScan().planFiles();
     List<DataFile> dataFiles =
         Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
@@ -435,12 +419,10 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
         Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::deletes)).stream()
             .flatMap(Collection::stream)
             .collect(Collectors.toSet());
-    Assert.assertEquals("Should have 3 data files before rewrite", 3, 
dataFiles.size());
-    Assert.assertEquals("Should have 1 delete file before rewrite", 1, 
deleteFiles.size());
-    Assert.assertSame(
-        "The 1 delete file should be an equality-delete file",
-        Iterables.getOnlyElement(deleteFiles).content(),
-        FileContent.EQUALITY_DELETES);
+    assertThat(dataFiles).hasSize(3);
+    assertThat(deleteFiles).hasSize(1);
+    assertThat(Iterables.getOnlyElement(deleteFiles).content())
+        .isEqualTo(FileContent.EQUALITY_DELETES);
     shouldHaveDataAndFileSequenceNumbers(
         TABLE_NAME_WITH_PK,
         ImmutableList.of(Pair.of(1L, 1L), Pair.of(2L, 2L), Pair.of(3L, 3L), 
Pair.of(3L, 3L)));
@@ -459,8 +441,8 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
         
Actions.forTable(stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
 
     // Should not rewrite files from the new commit
-    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
-    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+    assertThat(result.deletedDataFiles()).hasSize(2);
+    assertThat(result.addedDataFiles()).hasSize(1);
     // The 2 older files with file-sequence-number <= 2 should be rewritten 
into a new file.
     // The new file is the one with file-sequence-number == 4.
     // The new file should use rewrite's starting-sequence-number 2 as its 
data-sequence-number.
@@ -494,6 +476,6 @@ public class TestRewriteDataFilesAction extends 
FlinkCatalogTestBase {
                     Pair.<Long, Long>of(
                         row.getFieldAs("sequence_number"), 
row.getFieldAs("file_sequence_number")))
             .collect(Collectors.toList());
-    
Assertions.assertThat(actualSequenceNumbers).hasSameElementsAs(expectedSequenceNumbers);
+    
assertThat(actualSequenceNumbers).hasSameElementsAs(expectedSequenceNumbers);
   }
 }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
index 5ecf4f4536..f58cc87c6a 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
@@ -18,7 +18,12 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -41,6 +46,8 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.MetricsUtil;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -52,7 +59,7 @@ import org.apache.iceberg.data.FileHelpers;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.CatalogTestBase;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
@@ -60,29 +67,21 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SnapshotUtil;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFlinkMetaDataTable extends FlinkCatalogTestBase {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestFlinkMetaDataTable extends CatalogTestBase {
   private static final String TABLE_NAME = "test_table";
   private final FileFormat format = FileFormat.AVRO;
-  private static final TemporaryFolder TEMP = new TemporaryFolder();
-  private final boolean isPartition;
+  private @TempDir Path temp;
 
-  public TestFlinkMetaDataTable(String catalogName, Namespace baseNamespace, 
Boolean isPartition) {
-    super(catalogName, baseNamespace);
-    this.isPartition = isPartition;
-  }
+  @Parameter(index = 2)
+  private Boolean isPartition;
 
-  @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, 
isPartition={2}")
-  public static Iterable<Object[]> parameters() {
+  @Parameters(name = "catalogName={0}, baseNamespace={1}, isPartition={2}")
+  protected static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
 
     for (Boolean isPartition : new Boolean[] {true, false}) {
@@ -100,7 +99,7 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     return super.getTableEnv();
   }
 
-  @Before
+  @BeforeEach
   public void before() {
     super.before();
     sql("USE CATALOG %s", catalogName);
@@ -124,14 +123,14 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
   }
 
   @Override
-  @After
+  @AfterEach
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
     super.clean();
   }
 
-  @Test
+  @TestTemplate
   public void testSnapshots() {
     String sql = String.format("SELECT * FROM %s$snapshots ", TABLE_NAME);
     List<Row> result = sql(sql);
@@ -140,22 +139,22 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     Iterator<Snapshot> snapshots = table.snapshots().iterator();
     for (Row row : result) {
       Snapshot next = snapshots.next();
-      Assert.assertEquals(
-          "Should have expected timestamp",
-          ((Instant) row.getField(0)).toEpochMilli(),
-          next.timestampMillis());
-      Assert.assertEquals("Should have expected snapshot id", 
next.snapshotId(), row.getField(1));
-      Assert.assertEquals("Should have expected parent id", next.parentId(), 
row.getField(2));
-      Assert.assertEquals("Should have expected operation", next.operation(), 
row.getField(3));
-      Assert.assertEquals(
-          "Should have expected manifest list location",
-          row.getField(4),
-          next.manifestListLocation());
-      Assert.assertEquals("Should have expected summary", next.summary(), 
row.getField(5));
+      assertThat(((Instant) row.getField(0)).toEpochMilli())
+          .as("Should have expected timestamp")
+          .isEqualTo(next.timestampMillis());
+      assertThat(next.snapshotId())
+          .as("Should have expected snapshot id")
+          .isEqualTo(next.snapshotId());
+      assertThat(row.getField(2)).as("Should have expected parent 
id").isEqualTo(next.parentId());
+      assertThat(row.getField(3)).as("Should have expected 
operation").isEqualTo(next.operation());
+      assertThat(row.getField(4))
+          .as("Should have expected manifest list location")
+          .isEqualTo(next.manifestListLocation());
+      assertThat(row.getField(5)).as("Should have expected 
summary").isEqualTo(next.summary());
     }
   }
 
-  @Test
+  @TestTemplate
   public void testHistory() {
     String sql = String.format("SELECT * FROM %s$history ", TABLE_NAME);
     List<Row> result = sql(sql);
@@ -164,21 +163,22 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     Iterator<Snapshot> snapshots = table.snapshots().iterator();
     for (Row row : result) {
       Snapshot next = snapshots.next();
-      Assert.assertEquals(
-          "Should have expected made_current_at",
-          ((Instant) row.getField(0)).toEpochMilli(),
-          next.timestampMillis());
-      Assert.assertEquals("Should have expected snapshot id", 
next.snapshotId(), row.getField(1));
-      Assert.assertEquals("Should have expected parent id", next.parentId(), 
row.getField(2));
-
-      Assert.assertEquals(
-          "Should have expected is current ancestor",
-          SnapshotUtil.isAncestorOf(table, 
table.currentSnapshot().snapshotId(), next.snapshotId()),
-          row.getField(3));
+      assertThat(((Instant) row.getField(0)).toEpochMilli())
+          .as("Should have expected made_current_at")
+          .isEqualTo(next.timestampMillis());
+      assertThat(row.getField(1))
+          .as("Should have expected snapshot id")
+          .isEqualTo(next.snapshotId());
+      assertThat(row.getField(2)).as("Should have expected parent 
id").isEqualTo(next.parentId());
+      assertThat(row.getField(3))
+          .as("Should have expected is current ancestor")
+          .isEqualTo(
+              SnapshotUtil.isAncestorOf(
+                  table, table.currentSnapshot().snapshotId(), 
next.snapshotId()));
     }
   }
 
-  @Test
+  @TestTemplate
   public void testManifests() {
     String sql = String.format("SELECT * FROM %s$manifests ", TABLE_NAME);
     List<Row> result = sql(sql);
@@ -189,32 +189,32 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     for (int i = 0; i < result.size(); i++) {
       Row row = result.get(i);
       ManifestFile manifestFile = expectedDataManifests.get(i);
-      Assert.assertEquals(
-          "Should have expected content", manifestFile.content().id(), 
row.getField(0));
-      Assert.assertEquals("Should have expected path", manifestFile.path(), 
row.getField(1));
-      Assert.assertEquals("Should have expected length", 
manifestFile.length(), row.getField(2));
-      Assert.assertEquals(
-          "Should have expected partition_spec_id",
-          manifestFile.partitionSpecId(),
-          row.getField(3));
-      Assert.assertEquals(
-          "Should have expected added_snapshot_id", manifestFile.snapshotId(), 
row.getField(4));
-      Assert.assertEquals(
-          "Should have expected added_data_files_count",
-          manifestFile.addedFilesCount(),
-          row.getField(5));
-      Assert.assertEquals(
-          "Should have expected existing_data_files_count",
-          manifestFile.existingFilesCount(),
-          row.getField(6));
-      Assert.assertEquals(
-          "Should have expected deleted_data_files_count",
-          manifestFile.deletedFilesCount(),
-          row.getField(7));
+      assertThat(row.getField(0))
+          .as("Should have expected content")
+          .isEqualTo(manifestFile.content().id());
+      assertThat(row.getField(1)).as("Should have expected 
path").isEqualTo(manifestFile.path());
+      assertThat(row.getField(2))
+          .as("Should have expected length")
+          .isEqualTo(manifestFile.length());
+      assertThat(row.getField(3))
+          .as("Should have expected partition_spec_id")
+          .isEqualTo(manifestFile.partitionSpecId());
+      assertThat(row.getField(4))
+          .as("Should have expected added_snapshot_id")
+          .isEqualTo(manifestFile.snapshotId());
+      assertThat(row.getField(5))
+          .as("Should have expected added_data_files_count")
+          .isEqualTo(manifestFile.addedFilesCount());
+      assertThat(row.getField(6))
+          .as("Should have expected existing_data_files_count")
+          .isEqualTo(manifestFile.existingFilesCount());
+      assertThat(row.getField(7))
+          .as("Should have expected deleted_data_files_count")
+          .isEqualTo(manifestFile.deletedFilesCount());
     }
   }
 
-  @Test
+  @TestTemplate
   public void testAllManifests() {
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
 
@@ -223,55 +223,54 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
 
     List<ManifestFile> expectedDataManifests = allDataManifests(table);
 
-    Assert.assertEquals(expectedDataManifests.size(), result.size());
+    assertThat(expectedDataManifests).hasSize(result.size());
     for (int i = 0; i < result.size(); i++) {
       Row row = result.get(i);
       ManifestFile manifestFile = expectedDataManifests.get(i);
-      Assert.assertEquals(
-          "Should have expected content", manifestFile.content().id(), 
row.getField(0));
-      Assert.assertEquals("Should have expected path", manifestFile.path(), 
row.getField(1));
-      Assert.assertEquals("Should have expected length", 
manifestFile.length(), row.getField(2));
-      Assert.assertEquals(
-          "Should have expected partition_spec_id",
-          manifestFile.partitionSpecId(),
-          row.getField(3));
-      Assert.assertEquals(
-          "Should have expected added_snapshot_id", manifestFile.snapshotId(), 
row.getField(4));
-      Assert.assertEquals(
-          "Should have expected added_data_files_count",
-          manifestFile.addedFilesCount(),
-          row.getField(5));
-      Assert.assertEquals(
-          "Should have expected existing_data_files_count",
-          manifestFile.existingFilesCount(),
-          row.getField(6));
-      Assert.assertEquals(
-          "Should have expected deleted_data_files_count",
-          manifestFile.deletedFilesCount(),
-          row.getField(7));
+      assertThat(row.getField(0))
+          .as("Should have expected content")
+          .isEqualTo(manifestFile.content().id());
+      assertThat(row.getField(1)).as("Should have expected 
path").isEqualTo(manifestFile.path());
+      assertThat(row.getField(2))
+          .as("Should have expected length")
+          .isEqualTo(manifestFile.length());
+      assertThat(row.getField(3))
+          .as("Should have expected partition_spec_id")
+          .isEqualTo(manifestFile.partitionSpecId());
+      assertThat(row.getField(4))
+          .as("Should have expected added_snapshot_id")
+          .isEqualTo(manifestFile.snapshotId());
+      assertThat(row.getField(5))
+          .as("Should have expected added_data_files_count")
+          .isEqualTo(manifestFile.addedFilesCount());
+      assertThat(row.getField(6))
+          .as("Should have expected existing_data_files_count")
+          .isEqualTo(manifestFile.existingFilesCount());
+      assertThat(row.getField(7))
+          .as("Should have expected deleted_data_files_count")
+          .isEqualTo(manifestFile.deletedFilesCount());
     }
   }
 
-  @Test
+  @TestTemplate
   public void testUnPartitionedTable() throws IOException {
-    Assume.assumeFalse(isPartition);
+    assumeThat(isPartition).isFalse();
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
 
     Schema deleteRowSchema = table.schema().select("id");
     Record dataDelete = GenericRecord.create(deleteRowSchema);
     List<Record> dataDeletes = Lists.newArrayList(dataDelete.copy("id", 1));
-
-    TEMP.create();
+    File testFile = File.createTempFile("junit", null, temp.toFile());
     DeleteFile eqDeletes =
         FileHelpers.writeDeleteFile(
-            table, Files.localOutput(TEMP.newFile()), dataDeletes, 
deleteRowSchema);
+            table, Files.localOutput(testFile), dataDeletes, deleteRowSchema);
     table.newRowDelta().addDeletes(eqDeletes).commit();
 
     List<ManifestFile> expectedDataManifests = dataManifests(table);
     List<ManifestFile> expectedDeleteManifests = deleteManifests(table);
 
-    Assert.assertEquals("Should have 2 data manifest", 2, 
expectedDataManifests.size());
-    Assert.assertEquals("Should have 1 delete manifest", 1, 
expectedDeleteManifests.size());
+    assertThat(expectedDataManifests).hasSize(2);
+    assertThat(expectedDeleteManifests).hasSize(1);
 
     Schema entriesTableSchema =
         MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.from("entries"))
@@ -294,12 +293,13 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     deleteFilesTableSchema = deleteFilesTableSchema.select(deleteColumns);
 
     List<Row> actualDeleteFiles = sql("SELECT %s FROM %s$delete_files", 
deleteNames, TABLE_NAME);
-    Assert.assertEquals("Metadata table should return 1 delete file", 1, 
actualDeleteFiles.size());
+    assertThat(actualDeleteFiles).hasSize(1);
+    assertThat(expectedDeleteManifests).as("Should have 1 delete 
manifest").hasSize(1);
 
     List<GenericData.Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.EQUALITY_DELETES, entriesTableSchema, 
expectedDeleteManifests, null);
-    Assert.assertEquals("Should be 1 delete file manifest entry", 1, 
expectedDeleteFiles.size());
+    assertThat(expectedDeleteFiles).as("Should be 1 delete file manifest 
entry").hasSize(1);
     TestHelpers.assertEquals(
         deleteFilesTableSchema, expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
 
@@ -318,51 +318,50 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     filesTableSchema = filesTableSchema.select(columns);
 
     List<Row> actualDataFiles = sql("SELECT %s FROM %s$data_files", names, 
TABLE_NAME);
-    Assert.assertEquals("Metadata table should return 2 data file", 2, 
actualDataFiles.size());
-
+    assertThat(actualDataFiles).as("Metadata table should return 2 data 
file").hasSize(2);
     List<GenericData.Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
-    Assert.assertEquals("Should be 2 data file manifest entry", 2, 
expectedDataFiles.size());
+    assertThat(expectedDataFiles).as("Should be 2 data file manifest 
entry").hasSize(2);
     TestHelpers.assertEquals(filesTableSchema, expectedDataFiles.get(0), 
actualDataFiles.get(0));
 
     // check all files table
     List<Row> actualFiles = sql("SELECT %s FROM %s$files ORDER BY content", 
names, TABLE_NAME);
-    Assert.assertEquals("Metadata table should return 3 files", 3, 
actualFiles.size());
-
+    assertThat(actualFiles).as("Metadata table should return 3 
files").hasSize(3);
     List<GenericData.Record> expectedFiles =
         Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream())
             .collect(Collectors.toList());
-    Assert.assertEquals("Should have 3 files manifest entries", 3, 
expectedFiles.size());
+    assertThat(expectedFiles).as("Should have 3 files manifest 
entriess").hasSize(3);
     TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(0), 
actualFiles.get(0));
     TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(1), 
actualFiles.get(1));
   }
 
-  @Test
+  @TestTemplate
   public void testPartitionedTable() throws Exception {
-    Assume.assumeFalse(!isPartition);
+    assumeThat(isPartition).isTrue();
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
 
     Schema deleteRowSchema = table.schema().select("id", "data");
     Record dataDelete = GenericRecord.create(deleteRowSchema);
-    TEMP.create();
 
     Map<String, Object> deleteRow = Maps.newHashMap();
     deleteRow.put("id", 1);
     deleteRow.put("data", "a");
+    File testFile = File.createTempFile("junit", null, temp.toFile());
     DeleteFile eqDeletes =
         FileHelpers.writeDeleteFile(
             table,
-            Files.localOutput(TEMP.newFile()),
+            Files.localOutput(testFile),
             org.apache.iceberg.TestHelpers.Row.of("a"),
             Lists.newArrayList(dataDelete.copy(deleteRow)),
             deleteRowSchema);
     table.newRowDelta().addDeletes(eqDeletes).commit();
 
     deleteRow.put("data", "b");
+    File testFile2 = File.createTempFile("junit", null, temp.toFile());
     DeleteFile eqDeletes2 =
         FileHelpers.writeDeleteFile(
             table,
-            Files.localOutput(TEMP.newFile()),
+            Files.localOutput(testFile2),
             org.apache.iceberg.TestHelpers.Row.of("b"),
             Lists.newArrayList(dataDelete.copy(deleteRow)),
             deleteRowSchema);
@@ -375,9 +374,8 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     List<ManifestFile> expectedDataManifests = dataManifests(table);
     List<ManifestFile> expectedDeleteManifests = deleteManifests(table);
 
-    Assert.assertEquals("Should have 2 data manifests", 2, 
expectedDataManifests.size());
-    Assert.assertEquals("Should have 2 delete manifests", 2, 
expectedDeleteManifests.size());
-
+    assertThat(expectedDataManifests).hasSize(2);
+    assertThat(expectedDeleteManifests).hasSize(2);
     Table deleteFilesTable =
         MetadataTableUtils.createMetadataTableInstance(
             table, MetadataTableType.from("delete_files"));
@@ -396,75 +394,67 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     List<GenericData.Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.EQUALITY_DELETES, entriesTableSchema, 
expectedDeleteManifests, "a");
-    Assert.assertEquals(
-        "Should have one delete file manifest entry", 1, 
expectedDeleteFiles.size());
-
+    assertThat(expectedDeleteFiles).hasSize(1);
     List<Row> actualDeleteFiles =
         sql("SELECT %s FROM %s$delete_files WHERE `partition`.`data`='a'", 
names, TABLE_NAME);
 
-    Assert.assertEquals(
-        "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
+    assertThat(actualDeleteFiles).hasSize(1);
     TestHelpers.assertEquals(
         filesTableSchema, expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
 
     // Check data files table
     List<GenericData.Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, "a");
-    Assert.assertEquals("Should have one data file manifest entry", 1, 
expectedDataFiles.size());
-
+    assertThat(expectedDataFiles).hasSize(1);
     List<Row> actualDataFiles =
         sql("SELECT %s FROM %s$data_files  WHERE `partition`.`data`='a'", 
names, TABLE_NAME);
-    Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
+    assertThat(actualDataFiles).hasSize(1);
     TestHelpers.assertEquals(filesTableSchema, expectedDataFiles.get(0), 
actualDataFiles.get(0));
 
     List<Row> actualPartitionsWithProjection =
         sql("SELECT file_count FROM %s$partitions ", TABLE_NAME);
-    Assert.assertEquals(
-        "Metadata table should return two partitions record",
-        2,
-        actualPartitionsWithProjection.size());
+    assertThat(actualPartitionsWithProjection).hasSize(2);
     for (int i = 0; i < 2; ++i) {
-      Assert.assertEquals(1, 
actualPartitionsWithProjection.get(i).getField(0));
+      
assertThat(actualPartitionsWithProjection.get(i).getField(0)).isEqualTo(1);
     }
 
     // Check files table
     List<GenericData.Record> expectedFiles =
         Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream())
             .collect(Collectors.toList());
-    Assert.assertEquals("Should have two file manifest entries", 2, 
expectedFiles.size());
-
+    assertThat(expectedFiles).hasSize(2);
     List<Row> actualFiles =
         sql(
             "SELECT %s FROM %s$files WHERE `partition`.`data`='a' ORDER BY 
content",
             names, TABLE_NAME);
-    Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
+    assertThat(actualFiles).hasSize(2);
     TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(0), 
actualFiles.get(0));
     TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(1), 
actualFiles.get(1));
   }
 
-  @Test
+  @TestTemplate
   public void testAllFilesUnpartitioned() throws Exception {
-    Assume.assumeFalse(isPartition);
+    assumeThat(isPartition).isFalse();
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
 
     Schema deleteRowSchema = table.schema().select("id", "data");
     Record dataDelete = GenericRecord.create(deleteRowSchema);
-    TEMP.create();
 
     Map<String, Object> deleteRow = Maps.newHashMap();
     deleteRow.put("id", 1);
+    File testFile = File.createTempFile("junit", null, temp.toFile());
     DeleteFile eqDeletes =
         FileHelpers.writeDeleteFile(
             table,
-            Files.localOutput(TEMP.newFile()),
+            Files.localOutput(testFile),
             Lists.newArrayList(dataDelete.copy(deleteRow)),
             deleteRowSchema);
     table.newRowDelta().addDeletes(eqDeletes).commit();
 
     List<ManifestFile> expectedDataManifests = dataManifests(table);
-    Assert.assertEquals("Should have 2 data manifest", 2, 
expectedDataManifests.size());
+    assertThat(expectedDataManifests).hasSize(2);
     List<ManifestFile> expectedDeleteManifests = deleteManifests(table);
-    Assert.assertEquals("Should have 1 delete manifest", 1, 
expectedDeleteManifests.size());
+    assertThat(expectedDeleteManifests).hasSize(1);
 
     // Clear table to test whether 'all_files' can read past files
     table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
@@ -492,8 +482,8 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
 
     List<GenericData.Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
-    Assert.assertEquals("Should be 2 data file manifest entry", 2, 
expectedDataFiles.size());
-    Assert.assertEquals("Metadata table should return 2 data file", 2, 
actualDataFiles.size());
+    assertThat(expectedDataFiles).hasSize(2);
+    assertThat(actualDataFiles).hasSize(2);
     TestHelpers.assertEquals(filesTableSchema, expectedDataFiles, 
actualDataFiles);
 
     // Check all delete files table
@@ -501,9 +491,8 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     List<GenericData.Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.EQUALITY_DELETES, entriesTableSchema, 
expectedDeleteManifests, null);
-    Assert.assertEquals("Should be one delete file manifest entry", 1, 
expectedDeleteFiles.size());
-    Assert.assertEquals(
-        "Metadata table should return one delete file", 1, 
actualDeleteFiles.size());
+    assertThat(expectedDeleteFiles).hasSize(1);
+    assertThat(actualDeleteFiles).hasSize(1);
     TestHelpers.assertEquals(
         filesTableSchema, expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
 
@@ -513,43 +502,43 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     List<GenericData.Record> expectedFiles =
         ListUtils.union(expectedDataFiles, expectedDeleteFiles);
     expectedFiles.sort(Comparator.comparing(r -> ((Integer) 
r.get("content"))));
-    Assert.assertEquals("Metadata table should return 3 files", 3, 
actualFiles.size());
+    assertThat(actualFiles).hasSize(3);
     TestHelpers.assertEquals(filesTableSchema, expectedFiles, actualFiles);
   }
 
-  @Test
+  @TestTemplate
   public void testAllFilesPartitioned() throws Exception {
-    Assume.assumeFalse(!isPartition);
+    assumeThat(!isPartition).isFalse();
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
 
     // Create delete file
     Schema deleteRowSchema = table.schema().select("id");
     Record dataDelete = GenericRecord.create(deleteRowSchema);
-    TEMP.create();
 
     Map<String, Object> deleteRow = Maps.newHashMap();
     deleteRow.put("id", 1);
+    File testFile = File.createTempFile("junit", null, temp.toFile());
     DeleteFile eqDeletes =
         FileHelpers.writeDeleteFile(
             table,
-            Files.localOutput(TEMP.newFile()),
+            Files.localOutput(testFile),
             org.apache.iceberg.TestHelpers.Row.of("a"),
             Lists.newArrayList(dataDelete.copy(deleteRow)),
             deleteRowSchema);
+    File testFile2 = File.createTempFile("junit", null, temp.toFile());
     DeleteFile eqDeletes2 =
         FileHelpers.writeDeleteFile(
             table,
-            Files.localOutput(TEMP.newFile()),
+            Files.localOutput(testFile2),
             org.apache.iceberg.TestHelpers.Row.of("b"),
             Lists.newArrayList(dataDelete.copy(deleteRow)),
             deleteRowSchema);
     table.newRowDelta().addDeletes(eqDeletes).addDeletes(eqDeletes2).commit();
 
     List<ManifestFile> expectedDataManifests = dataManifests(table);
-    Assert.assertEquals("Should have 2 data manifests", 2, 
expectedDataManifests.size());
+    assertThat(expectedDataManifests).hasSize(2);
     List<ManifestFile> expectedDeleteManifests = deleteManifests(table);
-    Assert.assertEquals("Should have 1 delete manifest", 1, 
expectedDeleteManifests.size());
-
+    assertThat(expectedDeleteManifests).hasSize(1);
     // Clear table to test whether 'all_files' can read past files
     table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
 
@@ -575,8 +564,8 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
         sql("SELECT %s FROM %s$all_data_files WHERE `partition`.`data`='a'", 
names, TABLE_NAME);
     List<GenericData.Record> expectedDataFiles =
         expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, "a");
-    Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDataFiles.size());
-    Assert.assertEquals("Metadata table should return one data file", 1, 
actualDataFiles.size());
+    assertThat(expectedDataFiles).hasSize(1);
+    assertThat(actualDataFiles).hasSize(1);
     TestHelpers.assertEquals(filesTableSchema, expectedDataFiles.get(0), 
actualDataFiles.get(0));
 
     // Check all delete files table
@@ -585,9 +574,8 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     List<GenericData.Record> expectedDeleteFiles =
         expectedEntries(
             table, FileContent.EQUALITY_DELETES, entriesTableSchema, 
expectedDeleteManifests, "a");
-    Assert.assertEquals("Should be one data file manifest entry", 1, 
expectedDeleteFiles.size());
-    Assert.assertEquals("Metadata table should return one data file", 1, 
actualDeleteFiles.size());
-
+    assertThat(expectedDeleteFiles).hasSize(1);
+    assertThat(actualDeleteFiles).hasSize(1);
     TestHelpers.assertEquals(
         filesTableSchema, expectedDeleteFiles.get(0), 
actualDeleteFiles.get(0));
 
@@ -599,11 +587,11 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     List<GenericData.Record> expectedFiles =
         ListUtils.union(expectedDataFiles, expectedDeleteFiles);
     expectedFiles.sort(Comparator.comparing(r -> ((Integer) 
r.get("content"))));
-    Assert.assertEquals("Metadata table should return two files", 2, 
actualFiles.size());
+    assertThat(actualFiles).hasSize(2);
     TestHelpers.assertEquals(filesTableSchema, expectedFiles, actualFiles);
   }
 
-  @Test
+  @TestTemplate
   public void testMetadataLogEntries() {
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
 
@@ -617,55 +605,51 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     // Check metadataLog table
     List<Row> metadataLogs = sql("SELECT * FROM %s$metadata_log_entries", 
TABLE_NAME);
 
-    Assert.assertEquals("metadataLogEntries table should return 3 row", 3, 
metadataLogs.size());
+    assertThat(metadataLogs).hasSize(3);
     Row metadataLog = metadataLogs.get(0);
-    Assert.assertEquals(
-        Instant.ofEpochMilli(metadataLogEntries.get(0).timestampMillis()),
-        metadataLog.getField("timestamp"));
-    Assert.assertEquals(metadataLogEntries.get(0).file(), 
metadataLog.getField("file"));
-    Assert.assertNull(metadataLog.getField("latest_snapshot_id"));
-    Assert.assertNull(metadataLog.getField("latest_schema_id"));
-    Assert.assertNull(metadataLog.getField("latest_sequence_number"));
+    assertThat(metadataLog.getField("timestamp"))
+        
.isEqualTo(Instant.ofEpochMilli(metadataLogEntries.get(0).timestampMillis()));
+    
assertThat(metadataLog.getField("file")).isEqualTo(metadataLogEntries.get(0).file());
+    assertThat(metadataLog.getField("latest_snapshot_id")).isNull();
+    assertThat(metadataLog.getField("latest_schema_id")).isNull();
+    assertThat(metadataLog.getField("latest_sequence_number")).isNull();
 
     metadataLog = metadataLogs.get(1);
-    Assert.assertEquals(
-        Instant.ofEpochMilli(metadataLogEntries.get(1).timestampMillis()),
-        metadataLog.getField("timestamp"));
-    Assert.assertEquals(metadataLogEntries.get(1).file(), 
metadataLog.getField("file"));
-    Assert.assertEquals(parentSnapshot.snapshotId(), 
metadataLog.getField("latest_snapshot_id"));
-    Assert.assertEquals(parentSnapshot.schemaId(), 
metadataLog.getField("latest_schema_id"));
-    Assert.assertEquals(
-        parentSnapshot.sequenceNumber(), 
metadataLog.getField("latest_sequence_number"));
+    assertThat(metadataLog.getField("timestamp"))
+        
.isEqualTo(Instant.ofEpochMilli(metadataLogEntries.get(1).timestampMillis()));
+    
assertThat(metadataLog.getField("file")).isEqualTo(metadataLogEntries.get(1).file());
+    
assertThat(metadataLog.getField("latest_snapshot_id")).isEqualTo(parentSnapshot.snapshotId());
+    
assertThat(metadataLog.getField("latest_schema_id")).isEqualTo(parentSnapshot.schemaId());
+    assertThat(metadataLog.getField("latest_sequence_number"))
+        .isEqualTo(parentSnapshot.sequenceNumber());
+    
assertThat(metadataLog.getField("latest_snapshot_id")).isEqualTo(parentSnapshot.snapshotId());
 
     metadataLog = metadataLogs.get(2);
-    Assert.assertEquals(
-        Instant.ofEpochMilli(currentSnapshot.timestampMillis()), 
metadataLog.getField("timestamp"));
-    Assert.assertEquals(tableMetadata.metadataFileLocation(), 
metadataLog.getField("file"));
-    Assert.assertEquals(currentSnapshot.snapshotId(), 
metadataLog.getField("latest_snapshot_id"));
-    Assert.assertEquals(currentSnapshot.schemaId(), 
metadataLog.getField("latest_schema_id"));
-    Assert.assertEquals(
-        currentSnapshot.sequenceNumber(), 
metadataLog.getField("latest_sequence_number"));
+    assertThat(metadataLog.getField("timestamp"))
+        .isEqualTo(Instant.ofEpochMilli(currentSnapshot.timestampMillis()));
+    
assertThat(metadataLog.getField("file")).isEqualTo(tableMetadata.metadataFileLocation());
+    
assertThat(metadataLog.getField("latest_snapshot_id")).isEqualTo(currentSnapshot.snapshotId());
+    
assertThat(metadataLog.getField("latest_schema_id")).isEqualTo(currentSnapshot.schemaId());
+    assertThat(metadataLog.getField("latest_sequence_number"))
+        .isEqualTo(currentSnapshot.sequenceNumber());
 
     // test filtering
     List<Row> metadataLogWithFilters =
         sql(
             "SELECT * FROM %s$metadata_log_entries WHERE latest_snapshot_id = 
%s",
             TABLE_NAME, currentSnapshotId);
-    Assert.assertEquals(
-        "metadataLogEntries table should return 1 row", 1, 
metadataLogWithFilters.size());
-
+    assertThat(metadataLogWithFilters).hasSize(1);
     metadataLog = metadataLogWithFilters.get(0);
-    Assert.assertEquals(
-        
Instant.ofEpochMilli(tableMetadata.currentSnapshot().timestampMillis()),
-        metadataLog.getField("timestamp"));
-    Assert.assertEquals(tableMetadata.metadataFileLocation(), 
metadataLog.getField("file"));
-    Assert.assertEquals(
-        tableMetadata.currentSnapshot().snapshotId(), 
metadataLog.getField("latest_snapshot_id"));
-    Assert.assertEquals(
-        tableMetadata.currentSnapshot().schemaId(), 
metadataLog.getField("latest_schema_id"));
-    Assert.assertEquals(
-        tableMetadata.currentSnapshot().sequenceNumber(),
-        metadataLog.getField("latest_sequence_number"));
+    
assertThat(Instant.ofEpochMilli(tableMetadata.currentSnapshot().timestampMillis()))
+        .isEqualTo(metadataLog.getField("timestamp"));
+
+    
assertThat(metadataLog.getField("file")).isEqualTo(tableMetadata.metadataFileLocation());
+    assertThat(metadataLog.getField("latest_snapshot_id"))
+        .isEqualTo(tableMetadata.currentSnapshot().snapshotId());
+    assertThat(metadataLog.getField("latest_schema_id"))
+        .isEqualTo(tableMetadata.currentSnapshot().schemaId());
+    assertThat(metadataLog.getField("latest_sequence_number"))
+        .isEqualTo(tableMetadata.currentSnapshot().sequenceNumber());
 
     // test projection
     List<String> metadataFiles =
@@ -675,14 +659,13 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
     metadataFiles.add(tableMetadata.metadataFileLocation());
     List<Row> metadataLogWithProjection =
         sql("SELECT file FROM %s$metadata_log_entries", TABLE_NAME);
-    Assert.assertEquals(
-        "metadataLogEntries table should return 3 rows", 3, 
metadataLogWithProjection.size());
+    assertThat(metadataLogWithProjection).hasSize(3);
     for (int i = 0; i < metadataFiles.size(); i++) {
-      Assert.assertEquals(metadataFiles.get(i), 
metadataLogWithProjection.get(i).getField("file"));
+      
assertThat(metadataLogWithProjection.get(i).getField("file")).isEqualTo(metadataFiles.get(i));
     }
   }
 
-  @Test
+  @TestTemplate
   public void testSnapshotReferencesMetatable() {
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
 
@@ -704,62 +687,63 @@ public class TestFlinkMetaDataTable extends 
FlinkCatalogTestBase {
         .commit();
     // Check refs table
     List<Row> references = sql("SELECT * FROM %s$refs", TABLE_NAME);
-    Assert.assertEquals("Refs table should return 3 rows", 3, 
references.size());
     List<Row> branches = sql("SELECT * FROM %s$refs WHERE type='BRANCH'", 
TABLE_NAME);
-    Assert.assertEquals("Refs table should return 2 branches", 2, 
branches.size());
+    assertThat(references).hasSize(3);
+    assertThat(branches).hasSize(2);
     List<Row> tags = sql("SELECT * FROM %s$refs WHERE type='TAG'", TABLE_NAME);
-    Assert.assertEquals("Refs table should return 1 tag", 1, tags.size());
-
+    assertThat(tags).hasSize(1);
     // Check branch entries in refs table
     List<Row> mainBranch =
         sql("SELECT * FROM %s$refs WHERE name='main' AND type='BRANCH'", 
TABLE_NAME);
-    Assert.assertEquals("main", mainBranch.get(0).getFieldAs("name"));
-    Assert.assertEquals("BRANCH", mainBranch.get(0).getFieldAs("type"));
-    Assert.assertEquals(currentSnapshotId, 
mainBranch.get(0).getFieldAs("snapshot_id"));
-
+    assertThat((String) 
mainBranch.get(0).getFieldAs("name")).isEqualTo("main");
+    assertThat((String) 
mainBranch.get(0).getFieldAs("type")).isEqualTo("BRANCH");
+    assertThat((Long) 
mainBranch.get(0).getFieldAs("snapshot_id")).isEqualTo(currentSnapshotId);
     List<Row> testBranch =
         sql("SELECT * FROM  %s$refs WHERE name='testBranch' AND 
type='BRANCH'", TABLE_NAME);
-    Assert.assertEquals("testBranch", testBranch.get(0).getFieldAs("name"));
-    Assert.assertEquals("BRANCH", testBranch.get(0).getFieldAs("type"));
-    Assert.assertEquals(currentSnapshotId, 
testBranch.get(0).getFieldAs("snapshot_id"));
-    Assert.assertEquals(Long.valueOf(10), 
testBranch.get(0).getFieldAs("max_reference_age_in_ms"));
-    Assert.assertEquals(Integer.valueOf(20), 
testBranch.get(0).getFieldAs("min_snapshots_to_keep"));
-    Assert.assertEquals(Long.valueOf(30), 
testBranch.get(0).getFieldAs("max_snapshot_age_in_ms"));
+    assertThat((String) 
testBranch.get(0).getFieldAs("name")).isEqualTo("testBranch");
+    assertThat((String) 
testBranch.get(0).getFieldAs("type")).isEqualTo("BRANCH");
+    assertThat((Long) 
testBranch.get(0).getFieldAs("snapshot_id")).isEqualTo(currentSnapshotId);
+    assertThat((Long) testBranch.get(0).getFieldAs("max_reference_age_in_ms"))
+        .isEqualTo(Long.valueOf(10));
+    assertThat((Integer) testBranch.get(0).getFieldAs("min_snapshots_to_keep"))
+        .isEqualTo(Integer.valueOf(20));
+    assertThat((Long) testBranch.get(0).getFieldAs("max_snapshot_age_in_ms"))
+        .isEqualTo(Long.valueOf(30));
 
     // Check tag entries in refs table
     List<Row> testTag =
         sql("SELECT * FROM %s$refs WHERE name='testTag' AND type='TAG'", 
TABLE_NAME);
-    Assert.assertEquals("testTag", testTag.get(0).getFieldAs("name"));
-    Assert.assertEquals("TAG", testTag.get(0).getFieldAs("type"));
-    Assert.assertEquals(currentSnapshotId, 
testTag.get(0).getFieldAs("snapshot_id"));
-    Assert.assertEquals(Long.valueOf(50), 
testTag.get(0).getFieldAs("max_reference_age_in_ms"));
-
+    assertThat((String) 
testTag.get(0).getFieldAs("name")).isEqualTo("testTag");
+    assertThat((String) testTag.get(0).getFieldAs("type")).isEqualTo("TAG");
+    assertThat((Long) 
testTag.get(0).getFieldAs("snapshot_id")).isEqualTo(currentSnapshotId);
+    assertThat((Long) testTag.get(0).getFieldAs("max_reference_age_in_ms"))
+        .isEqualTo(Long.valueOf(50));
     // Check projection in refs table
     List<Row> testTagProjection =
         sql(
             "SELECT 
name,type,snapshot_id,max_reference_age_in_ms,min_snapshots_to_keep FROM 
%s$refs where type='TAG'",
             TABLE_NAME);
-    Assert.assertEquals("testTag", 
testTagProjection.get(0).getFieldAs("name"));
-    Assert.assertEquals("TAG", testTagProjection.get(0).getFieldAs("type"));
-    Assert.assertEquals(currentSnapshotId, 
testTagProjection.get(0).getFieldAs("snapshot_id"));
-    Assert.assertEquals(
-        Long.valueOf(50), 
testTagProjection.get(0).getFieldAs("max_reference_age_in_ms"));
-    
Assert.assertNull(testTagProjection.get(0).getFieldAs("min_snapshots_to_keep"));
-
+    assertThat((String) 
testTagProjection.get(0).getFieldAs("name")).isEqualTo("testTag");
+    assertThat((String) 
testTagProjection.get(0).getFieldAs("type")).isEqualTo("TAG");
+    assertThat((Long) testTagProjection.get(0).getFieldAs("snapshot_id"))
+        .isEqualTo(currentSnapshotId);
+    assertThat((Long) 
testTagProjection.get(0).getFieldAs("max_reference_age_in_ms"))
+        .isEqualTo(Long.valueOf(50));
+    assertThat((String) 
testTagProjection.get(0).getFieldAs("min_snapshots_to_keep")).isNull();
     List<Row> mainBranchProjection =
         sql("SELECT name, type FROM %s$refs WHERE name='main' AND type = 
'BRANCH'", TABLE_NAME);
-    Assert.assertEquals("main", 
mainBranchProjection.get(0).getFieldAs("name"));
-    Assert.assertEquals("BRANCH", 
mainBranchProjection.get(0).getFieldAs("type"));
-
+    assertThat((String) 
mainBranchProjection.get(0).getFieldAs("name")).isEqualTo("main");
+    assertThat((String) 
mainBranchProjection.get(0).getFieldAs("type")).isEqualTo("BRANCH");
     List<Row> testBranchProjection =
         sql(
             "SELECT type, name, max_reference_age_in_ms, snapshot_id FROM 
%s$refs WHERE name='testBranch' AND type = 'BRANCH'",
             TABLE_NAME);
-    Assert.assertEquals("testBranch", 
testBranchProjection.get(0).getFieldAs("name"));
-    Assert.assertEquals("BRANCH", 
testBranchProjection.get(0).getFieldAs("type"));
-    Assert.assertEquals(currentSnapshotId, 
testBranchProjection.get(0).getFieldAs("snapshot_id"));
-    Assert.assertEquals(
-        Long.valueOf(10), 
testBranchProjection.get(0).getFieldAs("max_reference_age_in_ms"));
+    assertThat((String) 
testBranchProjection.get(0).getFieldAs("name")).isEqualTo("testBranch");
+    assertThat((String) 
testBranchProjection.get(0).getFieldAs("type")).isEqualTo("BRANCH");
+    assertThat((Long) testBranchProjection.get(0).getFieldAs("snapshot_id"))
+        .isEqualTo(currentSnapshotId);
+    assertThat((Long) 
testBranchProjection.get(0).getFieldAs("max_reference_age_in_ms"))
+        .isEqualTo(Long.valueOf(10));
   }
 
   /**
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 633e11718b..09d5a5481a 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -33,31 +35,25 @@ import org.apache.flink.util.CloseableIterator;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestHelpers;
-import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.CatalogTestBase;
 import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
 
-public class TestStreamScanSql extends FlinkCatalogTestBase {
+public class TestStreamScanSql extends CatalogTestBase {
   private static final String TABLE = "test_table";
   private static final FileFormat FORMAT = FileFormat.PARQUET;
 
   private TableEnvironment tEnv;
 
-  public TestStreamScanSql(String catalogName, Namespace baseNamespace) {
-    super(catalogName, baseNamespace);
-  }
-
   @Override
   protected TableEnvironment getTableEnv() {
     if (tEnv == null) {
@@ -85,7 +81,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
   }
 
   @Override
-  @Before
+  @BeforeEach
   public void before() {
     super.before();
     sql("CREATE DATABASE %s", flinkDatabase);
@@ -94,7 +90,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
   }
 
   @Override
-  @After
+  @AfterEach
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE);
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
@@ -102,7 +98,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
   }
 
   private void insertRows(String partition, Table table, Row... rows) throws 
IOException {
-    GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, 
TEMPORARY_FOLDER);
+    GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, 
temporaryDirectory);
 
     GenericRecord gRecord = GenericRecord.create(table.schema());
     List<Record> records = Lists.newArrayList();
@@ -127,20 +123,16 @@ public class TestStreamScanSql extends 
FlinkCatalogTestBase {
 
   private void assertRows(List<Row> expectedRows, Iterator<Row> iterator) {
     for (Row expectedRow : expectedRows) {
-      Assert.assertTrue("Should have more records", iterator.hasNext());
-
+      assertThat(iterator).hasNext();
       Row actualRow = iterator.next();
-      Assert.assertEquals("Should have expected fields", 3, 
actualRow.getArity());
-      Assert.assertEquals(
-          "Should have expected id", expectedRow.getField(0), 
actualRow.getField(0));
-      Assert.assertEquals(
-          "Should have expected data", expectedRow.getField(1), 
actualRow.getField(1));
-      Assert.assertEquals(
-          "Should have expected dt", expectedRow.getField(2), 
actualRow.getField(2));
+      assertThat(actualRow.getArity()).isEqualTo(3);
+      assertThat(actualRow.getField(0)).isEqualTo(expectedRow.getField(0));
+      assertThat(actualRow.getField(1)).isEqualTo(expectedRow.getField(1));
+      assertThat(actualRow.getField(2)).isEqualTo(expectedRow.getField(2));
     }
   }
 
-  @Test
+  @TestTemplate
   public void testUnPartitionedTable() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
@@ -160,7 +152,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase 
{
     result.getJobClient().ifPresent(JobClient::cancel);
   }
 
-  @Test
+  @TestTemplate
   public void testPartitionedTable() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR) PARTITIONED BY 
(dt)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
@@ -187,7 +179,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase 
{
     result.getJobClient().ifPresent(JobClient::cancel);
   }
 
-  @Test
+  @TestTemplate
   public void testConsumeFromBeginning() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
@@ -212,7 +204,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase 
{
     result.getJobClient().ifPresent(JobClient::cancel);
   }
 
-  @Test
+  @TestTemplate
   public void testConsumeFilesWithBranch() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
@@ -229,7 +221,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase 
{
         .hasMessage("Cannot scan table using ref b1 configured for streaming 
reader yet");
   }
 
-  @Test
+  @TestTemplate
   public void testConsumeFromStartSnapshotId() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
@@ -267,7 +259,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase 
{
     result.getJobClient().ifPresent(JobClient::cancel);
   }
 
-  @Test
+  @TestTemplate
   public void testConsumeFromStartTag() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));

Reply via email to