This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new a975a95552 Flink: Migrate HadoopCatalog related tests (#10358)
a975a95552 is described below

commit a975a955523239ca6423adf2ec26915ca8675f4f
Author: Tom Tanaka <[email protected]>
AuthorDate: Mon Jul 1 22:24:02 2024 +0900

    Flink: Migrate HadoopCatalog related tests (#10358)
---
 .../apache/iceberg/flink/HadoopTableExtension.java |  59 ++++++++
 .../iceberg/flink/sink/TestFlinkIcebergSink.java   | 148 ++++++++++-----------
 .../flink/sink/TestFlinkIcebergSinkBranch.java     |  66 ++++-----
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 111 ++++++----------
 .../flink/sink/TestFlinkIcebergSinkV2Base.java     |  42 ++++--
 .../flink/sink/TestFlinkIcebergSinkV2Branch.java   |  69 +++++-----
 .../iceberg/flink/source/TestFlinkSourceSql.java   |   4 +-
 .../iceberg/flink/source/TestIcebergSourceSql.java |   4 +-
 .../apache/iceberg/flink/source/TestSqlBase.java   |  46 ++++---
 .../iceberg/flink/source/reader/ReaderUtil.java    |  22 +++
 .../reader/TestColumnStatsWatermarkExtractor.java  |  90 ++++++-------
 11 files changed, 356 insertions(+), 305 deletions(-)

diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/HadoopTableExtension.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/HadoopTableExtension.java
new file mode 100644
index 0000000000..dc6ef400a4
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/HadoopTableExtension.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class HadoopTableExtension extends HadoopCatalogExtension {
+  private final Schema schema;
+  private final PartitionSpec partitionSpec;
+
+  private Table table;
+
+  public HadoopTableExtension(String database, String tableName, Schema 
schema) {
+    this(database, tableName, schema, null);
+  }
+
+  public HadoopTableExtension(
+      String database, String tableName, Schema schema, PartitionSpec 
partitionSpec) {
+    super(database, tableName);
+    this.schema = schema;
+    this.partitionSpec = partitionSpec;
+  }
+
+  @Override
+  public void beforeEach(ExtensionContext context) throws Exception {
+    super.beforeEach(context);
+    if (partitionSpec == null) {
+      this.table = catalog.createTable(TableIdentifier.of(database, 
tableName), schema);
+    } else {
+      this.table =
+          catalog.createTable(TableIdentifier.of(database, tableName), schema, 
partitionSpec);
+    }
+    tableLoader.open();
+  }
+
+  public Table table() {
+    return table;
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 9ff79419b0..527525e9f1 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.IOException;
@@ -29,17 +31,21 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.FlinkWriteOptions;
-import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
@@ -47,59 +53,52 @@ import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
-  @ClassRule
-  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
 
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+  @RegisterExtension
+  public static MiniClusterExtension miniClusterResource =
+      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
 
-  @Rule
-  public final HadoopCatalogResource catalogResource =
-      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE);
+  @RegisterExtension
+  private static final HadoopCatalogExtension catalogResource =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
 
   private TableLoader tableLoader;
 
-  private final FileFormat format;
-  private final int parallelism;
-  private final boolean partitioned;
+  @Parameter(index = 0)
+  private FileFormat format;
+
+  @Parameter(index = 1)
+  private int parallelism;
 
-  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned 
= {2}")
+  @Parameter(index = 2)
+  private boolean partitioned;
+
+  @Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      {"avro", 1, true},
-      {"avro", 1, false},
-      {"avro", 2, true},
-      {"avro", 2, false},
-      {"orc", 1, true},
-      {"orc", 1, false},
-      {"orc", 2, true},
-      {"orc", 2, false},
-      {"parquet", 1, true},
-      {"parquet", 1, false},
-      {"parquet", 2, true},
-      {"parquet", 2, false}
+      {FileFormat.AVRO, 1, true},
+      {FileFormat.AVRO, 1, false},
+      {FileFormat.AVRO, 2, true},
+      {FileFormat.AVRO, 2, false},
+      {FileFormat.ORC, 1, true},
+      {FileFormat.ORC, 1, false},
+      {FileFormat.ORC, 2, true},
+      {FileFormat.ORC, 2, false},
+      {FileFormat.PARQUET, 1, true},
+      {FileFormat.PARQUET, 1, false},
+      {FileFormat.PARQUET, 2, true},
+      {FileFormat.PARQUET, 2, false}
     };
   }
 
-  public TestFlinkIcebergSink(String format, int parallelism, boolean 
partitioned) {
-    this.format = FileFormat.fromString(format);
-    this.parallelism = parallelism;
-    this.partitioned = partitioned;
-  }
-
-  @Before
+  @BeforeEach
   public void before() throws IOException {
     table =
         catalogResource
@@ -122,7 +121,7 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
     tableLoader = catalogResource.tableLoader();
   }
 
-  @Test
+  @TestTemplate
   public void testWriteRowData() throws Exception {
     List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
     DataStream<RowData> dataStream =
@@ -165,17 +164,17 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
     return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", 
partition)).size();
   }
 
-  @Test
+  @TestTemplate
   public void testWriteRow() throws Exception {
     testWriteRow(null, DistributionMode.NONE);
   }
 
-  @Test
+  @TestTemplate
   public void testWriteRowWithTableSchema() throws Exception {
     testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
   }
 
-  @Test
+  @TestTemplate
   public void testJobNoneDistributeMode() throws Exception {
     table
         .updateProperties()
@@ -187,12 +186,12 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
     if (parallelism > 1) {
       if (partitioned) {
         int files = partitionFiles("aaa") + partitionFiles("bbb") + 
partitionFiles("ccc");
-        Assert.assertTrue("Should have more than 3 files in iceberg table.", 
files > 3);
+        assertThat(files).isGreaterThan(3);
       }
     }
   }
 
-  @Test
+  @TestTemplate
   public void testJobHashDistributionMode() {
     table
         .updateProperties()
@@ -204,7 +203,7 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
         .hasMessage("Flink does not support 'range' write distribution mode 
now.");
   }
 
-  @Test
+  @TestTemplate
   public void testJobNullDistributionMode() throws Exception {
     table
         .updateProperties()
@@ -214,42 +213,33 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
     testWriteRow(null, null);
 
     if (partitioned) {
-      Assert.assertEquals(
-          "There should be only 1 data file in partition 'aaa'", 1, 
partitionFiles("aaa"));
-      Assert.assertEquals(
-          "There should be only 1 data file in partition 'bbb'", 1, 
partitionFiles("bbb"));
-      Assert.assertEquals(
-          "There should be only 1 data file in partition 'ccc'", 1, 
partitionFiles("ccc"));
+      assertThat(partitionFiles("aaa")).isEqualTo(1);
+      assertThat(partitionFiles("bbb")).isEqualTo(1);
+      assertThat(partitionFiles("ccc")).isEqualTo(1);
     }
   }
 
-  @Test
+  @TestTemplate
   public void testPartitionWriteMode() throws Exception {
     testWriteRow(null, DistributionMode.HASH);
     if (partitioned) {
-      Assert.assertEquals(
-          "There should be only 1 data file in partition 'aaa'", 1, 
partitionFiles("aaa"));
-      Assert.assertEquals(
-          "There should be only 1 data file in partition 'bbb'", 1, 
partitionFiles("bbb"));
-      Assert.assertEquals(
-          "There should be only 1 data file in partition 'ccc'", 1, 
partitionFiles("ccc"));
+      assertThat(partitionFiles("aaa")).isEqualTo(1);
+      assertThat(partitionFiles("bbb")).isEqualTo(1);
+      assertThat(partitionFiles("ccc")).isEqualTo(1);
     }
   }
 
-  @Test
+  @TestTemplate
   public void testShuffleByPartitionWithSchema() throws Exception {
     testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
     if (partitioned) {
-      Assert.assertEquals(
-          "There should be only 1 data file in partition 'aaa'", 1, 
partitionFiles("aaa"));
-      Assert.assertEquals(
-          "There should be only 1 data file in partition 'bbb'", 1, 
partitionFiles("bbb"));
-      Assert.assertEquals(
-          "There should be only 1 data file in partition 'ccc'", 1, 
partitionFiles("ccc"));
+      assertThat(partitionFiles("aaa")).isEqualTo(1);
+      assertThat(partitionFiles("bbb")).isEqualTo(1);
+      assertThat(partitionFiles("ccc")).isEqualTo(1);
     }
   }
 
-  @Test
+  @TestTemplate
   public void testTwoSinksInDisjointedDAG() throws Exception {
     Map<String, String> props = 
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
 
@@ -323,16 +313,14 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
     SimpleDataUtil.assertTableRows(rightTable, convertToRowData(rightRows));
 
     leftTable.refresh();
-    Assert.assertNull(leftTable.currentSnapshot().summary().get("flink.test"));
-    Assert.assertNull(leftTable.currentSnapshot().summary().get("direction"));
+    
assertThat(leftTable.currentSnapshot().summary()).doesNotContainKeys("flink.test",
 "direction");
     rightTable.refresh();
-    Assert.assertEquals(
-        TestFlinkIcebergSink.class.getName(),
-        rightTable.currentSnapshot().summary().get("flink.test"));
-    Assert.assertEquals("rightTable", 
rightTable.currentSnapshot().summary().get("direction"));
+    assertThat(rightTable.currentSnapshot().summary())
+        .containsEntry("flink.test", TestFlinkIcebergSink.class.getName())
+        .containsEntry("direction", "rightTable");
   }
 
-  @Test
+  @TestTemplate
   public void testOverrideWriteConfigWithUnknownDistributionMode() {
     Map<String, String> newProps = Maps.newHashMap();
     newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
@@ -352,7 +340,7 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
         .hasMessage("Invalid distribution mode: UNRECOGNIZED");
   }
 
-  @Test
+  @TestTemplate
   public void testOverrideWriteConfigWithUnknownFileFormat() {
     Map<String, String> newProps = Maps.newHashMap();
     newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
@@ -372,7 +360,7 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
         .hasMessage("Invalid file format: UNRECOGNIZED");
   }
 
-  @Test
+  @TestTemplate
   public void testWriteRowWithTableRefreshInterval() throws Exception {
     List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
     DataStream<RowData> dataStream =
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
index 16b4542b00..547b4937c5 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
@@ -18,60 +18,60 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.IOException;
 import java.util.List;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
-  @ClassRule
-  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+  @RegisterExtension
+  public static final HadoopCatalogExtension catalogResource =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
 
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+  @Parameter(index = 0)
+  private String formatVersion;
 
-  @Rule
-  public final HadoopCatalogResource catalogResource =
-      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE);
+  @Parameter(index = 1)
+  private String branch;
 
-  private final String branch;
   private TableLoader tableLoader;
 
-  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
-  public static Object[] parameters() {
-    return new Object[] {"main", "testBranch"};
-  }
-
-  public TestFlinkIcebergSinkBranch(String branch) {
-    this.branch = branch;
+  @Parameters(name = "formatVersion = {0}, branch = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {"1", "main"},
+      {"1", "testBranch"},
+      {"2", "main"},
+      {"2", "testBranch"}
+    };
   }
 
-  @Before
+  @BeforeEach
   public void before() throws IOException {
     table =
         catalogResource
@@ -84,7 +84,7 @@ public class TestFlinkIcebergSinkBranch extends 
TestFlinkIcebergSinkBase {
                     TableProperties.DEFAULT_FILE_FORMAT,
                     FileFormat.AVRO.name(),
                     TableProperties.FORMAT_VERSION,
-                    "1"));
+                    formatVersion));
 
     env =
         StreamExecutionEnvironment.getExecutionEnvironment(
@@ -94,7 +94,7 @@ public class TestFlinkIcebergSinkBranch extends 
TestFlinkIcebergSinkBase {
     tableLoader = catalogResource.tableLoader();
   }
 
-  @Test
+  @TestTemplate
   public void testWriteRowWithTableSchema() throws Exception {
     testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
     verifyOtherBranchUnmodified();
@@ -129,9 +129,9 @@ public class TestFlinkIcebergSinkBranch extends 
TestFlinkIcebergSinkBase {
     String otherBranch =
         branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : 
SnapshotRef.MAIN_BRANCH;
     if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
-      Assert.assertNull(table.currentSnapshot());
+      assertThat(table.currentSnapshot()).isNull();
     }
 
-    Assert.assertTrue(table.snapshot(otherBranch) == null);
+    assertThat(table.snapshot(otherBranch)).isNull();
   }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 7712481d33..9cbb9f091e 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -18,85 +18,53 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.util.List;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.BoundedTestSource;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+@ExtendWith(ParameterizedTestExtension.class)
+@Timeout(value = 60)
 public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
+  @RegisterExtension
+  public static MiniClusterExtension miniClusterResource =
+      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
 
-  @ClassRule
-  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
-
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
-
-  @Rule
-  public final HadoopCatalogResource catalogResource =
-      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE);
-
-  @Rule public final Timeout globalTimeout = Timeout.seconds(60);
-
-  @Parameterized.Parameters(
-      name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, 
WriteDistributionMode ={3}")
-  public static Object[][] parameters() {
-    return new Object[][] {
-      new Object[] {"avro", 1, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"avro", 1, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"avro", 4, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"avro", 4, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"orc", 1, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"orc", 1, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"orc", 4, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"orc", 4, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"parquet", 1, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
-      new Object[] {"parquet", 1, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
-      new Object[] {"parquet", 4, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
-      new Object[] {"parquet", 4, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}
-    };
-  }
-
-  public TestFlinkIcebergSinkV2(
-      String format, int parallelism, boolean partitioned, String 
writeDistributionMode) {
-    this.format = FileFormat.fromString(format);
-    this.parallelism = parallelism;
-    this.partitioned = partitioned;
-    this.writeDistributionMode = writeDistributionMode;
-  }
+  @RegisterExtension
+  private static final HadoopCatalogExtension catalogResource =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
 
-  @Before
+  @BeforeEach
   public void setupTable() {
     table =
         catalogResource
@@ -129,7 +97,7 @@ public class TestFlinkIcebergSinkV2 extends 
TestFlinkIcebergSinkV2Base {
     tableLoader = catalogResource.tableLoader();
   }
 
-  @Test
+  @TestTemplate
   public void testCheckAndGetEqualityFieldIds() {
     table
         .updateSchema()
@@ -144,28 +112,25 @@ public class TestFlinkIcebergSinkV2 extends 
TestFlinkIcebergSinkV2Base {
         FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
 
     // Use schema identifier field IDs as equality field id list by default
-    Assert.assertEquals(
-        table.schema().identifierFieldIds(),
-        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+    assertThat(builder.checkAndGetEqualityFieldIds())
+        
.containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds());
 
     // Use user-provided equality field column as equality field id list
     builder.equalityFieldColumns(Lists.newArrayList("id"));
-    Assert.assertEquals(
-        Sets.newHashSet(table.schema().findField("id").fieldId()),
-        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+    assertThat(builder.checkAndGetEqualityFieldIds())
+        .containsExactlyInAnyOrder(table.schema().findField("id").fieldId());
 
     builder.equalityFieldColumns(Lists.newArrayList("type"));
-    Assert.assertEquals(
-        Sets.newHashSet(table.schema().findField("type").fieldId()),
-        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+    assertThat(builder.checkAndGetEqualityFieldIds())
+        .containsExactlyInAnyOrder(table.schema().findField("type").fieldId());
   }
 
-  @Test
+  @TestTemplate
   public void testChangeLogOnIdKey() throws Exception {
     testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH);
   }
 
-  @Test
+  @TestTemplate
   public void testUpsertOnlyDeletesOnDataKey() throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
@@ -184,22 +149,22 @@ public class TestFlinkIcebergSinkV2 extends 
TestFlinkIcebergSinkV2Base {
         SnapshotRef.MAIN_BRANCH);
   }
 
-  @Test
+  @TestTemplate
   public void testChangeLogOnDataKey() throws Exception {
     testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH);
   }
 
-  @Test
+  @TestTemplate
   public void testChangeLogOnIdDataKey() throws Exception {
     testChangeLogOnIdDataKey(SnapshotRef.MAIN_BRANCH);
   }
 
-  @Test
+  @TestTemplate
   public void testChangeLogOnSameKey() throws Exception {
     testChangeLogOnSameKey(SnapshotRef.MAIN_BRANCH);
   }
 
-  @Test
+  @TestTemplate
   public void testUpsertModeCheck() throws Exception {
     DataStream<Row> dataStream =
         env.addSource(new BoundedTestSource<>(ImmutableList.of()), 
ROW_TYPE_INFO);
@@ -227,22 +192,22 @@ public class TestFlinkIcebergSinkV2 extends 
TestFlinkIcebergSinkV2Base {
             "Equality field columns shouldn't be empty when configuring to use 
UPSERT data stream.");
   }
 
-  @Test
+  @TestTemplate
   public void testUpsertOnIdKey() throws Exception {
     testUpsertOnIdKey(SnapshotRef.MAIN_BRANCH);
   }
 
-  @Test
+  @TestTemplate
   public void testUpsertOnDataKey() throws Exception {
     testUpsertOnDataKey(SnapshotRef.MAIN_BRANCH);
   }
 
-  @Test
+  @TestTemplate
   public void testUpsertOnIdDataKey() throws Exception {
     testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
   }
 
-  @Test
+  @TestTemplate
   public void testDeleteStats() throws Exception {
     assumeThat(format).isNotEqualTo(FileFormat.AVRO);
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
index 9cdf7743c4..fc33c2fea5 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.IOException;
@@ -31,6 +32,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -44,7 +47,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.StructLikeSet;
-import org.junit.Assert;
 
 public class TestFlinkIcebergSinkV2Base {
 
@@ -55,14 +57,40 @@ public class TestFlinkIcebergSinkV2Base {
   protected static final int ROW_ID_POS = 0;
   protected static final int ROW_DATA_POS = 1;
 
-  protected int parallelism = 1;
   protected TableLoader tableLoader;
   protected Table table;
   protected StreamExecutionEnvironment env;
+
+  @Parameter(index = 0)
   protected FileFormat format;
+
+  @Parameter(index = 1)
+  protected int parallelism = 1;
+
+  @Parameter(index = 2)
   protected boolean partitioned;
+
+  @Parameter(index = 3)
   protected String writeDistributionMode;
 
+  @Parameters(name = "FileFormat={0}, Parallelism={1}, Partitioned={2}, 
WriteDistributionMode={3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      new Object[] {FileFormat.AVRO, 1, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+      new Object[] {FileFormat.AVRO, 1, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+      new Object[] {FileFormat.AVRO, 4, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+      new Object[] {FileFormat.AVRO, 4, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+      new Object[] {FileFormat.ORC, 1, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+      new Object[] {FileFormat.ORC, 1, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+      new Object[] {FileFormat.ORC, 4, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+      new Object[] {FileFormat.ORC, 4, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+      new Object[] {FileFormat.PARQUET, 1, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+      new Object[] {FileFormat.PARQUET, 1, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+      new Object[] {FileFormat.PARQUET, 4, true, 
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+      new Object[] {FileFormat.PARQUET, 4, false, 
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}
+    };
+  }
+
   protected static final Map<String, RowKind> ROW_KIND_MAP =
       ImmutableMap.of(
           "+I", RowKind.INSERT,
@@ -319,16 +347,14 @@ public class TestFlinkIcebergSinkV2Base {
     table.refresh();
     List<Snapshot> snapshots = findValidSnapshots();
     int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
-    Assert.assertEquals(
-        "Should have the expected snapshot number", expectedSnapshotNum, 
snapshots.size());
+    assertThat(snapshots).hasSize(expectedSnapshotNum);
 
     for (int i = 0; i < expectedSnapshotNum; i++) {
       long snapshotId = snapshots.get(i).snapshotId();
       List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
-      Assert.assertEquals(
-          "Should have the expected records for the checkpoint#" + i,
-          expectedRowSet(expectedRecords.toArray(new Record[0])),
-          actualRowSet(snapshotId, "*"));
+      assertThat(actualRowSet(snapshotId, "*"))
+          .as("Should have the expected records for the checkpoint#" + i)
+          .isEqualTo(expectedRowSet(expectedRecords.toArray(new Record[0])));
     }
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
index fed3338482..1c5c97b58d 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
@@ -18,52 +18,43 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.IOException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base {
-
-  @ClassRule
-  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
-
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
-  @Rule
-  public final HadoopCatalogResource catalogResource =
-      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE);
-
-  private final String branch;
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base {
+  @RegisterExtension
+  private static final HadoopCatalogExtension catalogResource =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
 
-  @Parameterized.Parameters(name = "branch = {0}")
-  public static Object[] parameters() {
-    return new Object[] {"main", "testBranch"};
-  }
+  @Parameter(index = 0)
+  private String branch;
 
-  public TestFlinkIcebergSinkV2Branch(String branch) {
-    this.branch = branch;
+  @Parameters(name = "branch = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {new Object[] {"main"}, new Object[] {"testBranch"}};
   }
 
-  @Before
+  @BeforeEach
   public void before() throws IOException {
     table =
         catalogResource
@@ -86,37 +77,37 @@ public class TestFlinkIcebergSinkV2Branch extends 
TestFlinkIcebergSinkV2Base {
     tableLoader = catalogResource.tableLoader();
   }
 
-  @Test
+  @TestTemplate
   public void testChangeLogOnIdKey() throws Exception {
     testChangeLogOnIdKey(branch);
     verifyOtherBranchUnmodified();
   }
 
-  @Test
+  @TestTemplate
   public void testChangeLogOnDataKey() throws Exception {
     testChangeLogOnDataKey(branch);
     verifyOtherBranchUnmodified();
   }
 
-  @Test
+  @TestTemplate
   public void testChangeLogOnIdDataKey() throws Exception {
     testChangeLogOnIdDataKey(branch);
     verifyOtherBranchUnmodified();
   }
 
-  @Test
+  @TestTemplate
   public void testUpsertOnIdKey() throws Exception {
     testUpsertOnIdKey(branch);
     verifyOtherBranchUnmodified();
   }
 
-  @Test
+  @TestTemplate
   public void testUpsertOnDataKey() throws Exception {
     testUpsertOnDataKey(branch);
     verifyOtherBranchUnmodified();
   }
 
-  @Test
+  @TestTemplate
   public void testUpsertOnIdDataKey() throws Exception {
     testUpsertOnIdDataKey(branch);
     verifyOtherBranchUnmodified();
@@ -126,9 +117,9 @@ public class TestFlinkIcebergSinkV2Branch extends 
TestFlinkIcebergSinkV2Base {
     String otherBranch =
         branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : 
SnapshotRef.MAIN_BRANCH;
     if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
-      Assert.assertNull(table.currentSnapshot());
+      assertThat(table.currentSnapshot()).isNull();
     }
 
-    Assert.assertTrue(table.snapshot(otherBranch) == null);
+    assertThat(table.snapshot(otherBranch)).isNull();
   }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java
index affd90c347..6857e0a7a3 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java
@@ -33,7 +33,7 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 /** Use the FlinkSource */
 public class TestFlinkSourceSql extends TestSqlBase {
@@ -61,7 +61,7 @@ public class TestFlinkSourceSql extends TestSqlBase {
             .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, 
null);
 
     GenericAppenderHelper helper =
-        new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+        new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder);
     List<Record> expectedRecords = Lists.newArrayList();
     long maxFileLen = 0;
     for (int i = 0; i < 5; i++) {
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index 4250460d27..645af7cfa3 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -40,7 +40,7 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 /** Use the IcebergSource (FLIP-27) */
 public class TestIcebergSourceSql extends TestSqlBase {
@@ -78,7 +78,7 @@ public class TestIcebergSourceSql extends TestSqlBase {
     long baseTime = 1702382109000L;
 
     GenericAppenderHelper helper =
-        new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+        new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder);
 
     Record file1Record1 =
         generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 
60 * 24 * 30L));
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
index dda4603314..94962e02bb 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
@@ -18,13 +18,17 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
@@ -34,30 +38,28 @@ import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkConfigOptions;
-import org.apache.iceberg.flink.HadoopCatalogResource;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 /** Test other more advanced usage of SQL. They don't need to run for every 
file format. */
 public abstract class TestSqlBase {
-  @ClassRule
-  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
+  @RegisterExtension
+  public static MiniClusterExtension miniClusterResource =
+      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
 
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+  @RegisterExtension
+  public static final HadoopCatalogExtension catalogResource =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
 
-  @Rule
-  public final HadoopCatalogResource catalogResource =
-      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE);
+  @TempDir protected Path temporaryFolder;
 
   private volatile TableEnvironment tEnv;
 
@@ -73,7 +75,7 @@ public abstract class TestSqlBase {
     return tEnv;
   }
 
-  @Before
+  @BeforeEach
   public abstract void before() throws IOException;
 
   @Test
@@ -90,7 +92,7 @@ public abstract class TestSqlBase {
     writeRecords.get(1).set(2, "2020-03-20");
 
     GenericAppenderHelper helper =
-        new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+        new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder);
 
     List<Record> expectedRecords = Lists.newArrayList();
     expectedRecords.add(writeRecords.get(0));
@@ -120,7 +122,7 @@ public abstract class TestSqlBase {
     expectedRecords.forEach(expectedRecord -> expectedRecord.set(2, 
"2020-03-20"));
 
     GenericAppenderHelper helper =
-        new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+        new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder);
     DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), 
expectedRecords);
     helper.appendToTable(dataFile);
 
@@ -140,9 +142,9 @@ public abstract class TestSqlBase {
 
     // When running with CI or local, `localityEnabled` will be false even if 
this configuration is
     // enabled
-    Assert.assertFalse(
-        "Expose split locality info should be false.",
-        SourceUtil.isLocalityEnabled(table, tableConf, true));
+    assertThat(SourceUtil.isLocalityEnabled(table, tableConf, true))
+        .as("Expose split locality info should be false.")
+        .isFalse();
 
     results = run(Maps.newHashMap(), "where dt='2020-03-20'", "*");
     org.apache.iceberg.flink.TestHelpers.assertRecords(
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
index f06d9b83bc..e3e341ca2c 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.source.reader;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.List;
 import org.apache.flink.table.data.RowData;
@@ -106,6 +107,7 @@ public class ReaderUtil {
     return Lists.partition(records, batchCount);
   }
 
+  // Only for JUnit4 tests. Keep this method for test migration from JUnit4 to 
JUnit5
   public static CombinedScanTask createCombinedScanTask(
       List<List<Record>> recordBatchList,
       TemporaryFolder temporaryFolder,
@@ -122,4 +124,24 @@ public class ReaderUtil {
 
     return new BaseCombinedScanTask(fileTasks);
   }
+
+  public static CombinedScanTask createCombinedScanTask(
+      List<List<Record>> recordBatchList,
+      Path temporaryFolder,
+      FileFormat fileFormat,
+      GenericAppenderFactory appenderFactory)
+      throws IOException {
+    List<FileScanTask> fileTasks = 
Lists.newArrayListWithCapacity(recordBatchList.size());
+    for (List<Record> recordBatch : recordBatchList) {
+      FileScanTask fileTask =
+          ReaderUtil.createFileTask(
+              recordBatch,
+              File.createTempFile("junit", null, temporaryFolder.toFile()),
+              fileFormat,
+              appenderFactory);
+      fileTasks.add(fileTask);
+    }
+
+    return new BaseCombinedScanTask(fileTasks);
+  }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
index 604bc09619..7033fd30e8 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
@@ -18,10 +18,14 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
+import java.nio.file.Path;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
@@ -30,27 +34,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.HadoopTableExtension;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestColumnStatsWatermarkExtractor {
   public static final Schema SCHEMA =
       new Schema(
@@ -68,15 +71,16 @@ public class TestColumnStatsWatermarkExtractor {
   private static final List<Map<String, Long>> MIN_VALUES =
       ImmutableList.of(Maps.newHashMapWithExpectedSize(3), 
Maps.newHashMapWithExpectedSize(3));
 
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+  @TempDir protected Path temporaryFolder;
 
-  @Rule
-  public final HadoopTableResource sourceTableResource =
-      new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE, SCHEMA);
+  @RegisterExtension
+  private static final HadoopTableExtension sourceTableResource =
+      new HadoopTableExtension(DATABASE, TestFixtures.TABLE, SCHEMA);
 
-  private final String columnName;
+  @Parameter(index = 0)
+  private String columnName;
 
-  @BeforeClass
+  @BeforeAll
   public static void updateMinValue() {
     for (int i = 0; i < TEST_RECORDS.size(); ++i) {
       for (Record r : TEST_RECORDS.get(i)) {
@@ -94,7 +98,7 @@ public class TestColumnStatsWatermarkExtractor {
     }
   }
 
-  @Parameterized.Parameters(name = "{0}")
+  @Parameters(name = "columnName = {0}")
   public static Collection<Object[]> data() {
     return ImmutableList.of(
         new Object[] {"timestamp_column"},
@@ -102,62 +106,56 @@ public class TestColumnStatsWatermarkExtractor {
         new Object[] {"long_column"});
   }
 
-  public TestColumnStatsWatermarkExtractor(String columnName) {
-    this.columnName = columnName;
-  }
-
-  @Test
+  @TestTemplate
   public void testSingle() throws IOException {
     ColumnStatsWatermarkExtractor extractor =
         new ColumnStatsWatermarkExtractor(SCHEMA, columnName, 
TimeUnit.MILLISECONDS);
 
-    Assert.assertEquals(
-        MIN_VALUES.get(0).get(columnName).longValue(), 
extractor.extractWatermark(split(0)));
+    assertThat(extractor.extractWatermark(split(0)))
+        .isEqualTo(MIN_VALUES.get(0).get(columnName).longValue());
   }
 
-  @Test
+  @TestTemplate
   public void testTimeUnit() throws IOException {
-    Assume.assumeTrue("Run only for long column", 
columnName.equals("long_column"));
+    assumeThat(columnName).isEqualTo("long_column");
     ColumnStatsWatermarkExtractor extractor =
         new ColumnStatsWatermarkExtractor(SCHEMA, columnName, 
TimeUnit.MICROSECONDS);
 
-    Assert.assertEquals(
-        MIN_VALUES.get(0).get(columnName).longValue() / 1000L,
-        extractor.extractWatermark(split(0)));
+    assertThat(extractor.extractWatermark(split(0)))
+        .isEqualTo(MIN_VALUES.get(0).get(columnName).longValue() / 1000L);
   }
 
-  @Test
+  @TestTemplate
   public void testMultipleFiles() throws IOException {
-    Assume.assumeTrue("Run only for the timestamp column", 
columnName.equals("timestamp_column"));
+    assumeThat(columnName).isEqualTo("timestamp_column");
     IcebergSourceSplit combinedSplit =
         IcebergSourceSplit.fromCombinedScanTask(
             ReaderUtil.createCombinedScanTask(
-                TEST_RECORDS, TEMPORARY_FOLDER, FileFormat.PARQUET, 
APPENDER_FACTORY));
+                TEST_RECORDS, temporaryFolder, FileFormat.PARQUET, 
APPENDER_FACTORY));
 
     ColumnStatsWatermarkExtractor extractor =
         new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null);
 
-    Assert.assertEquals(
-        MIN_VALUES.get(0).get(columnName).longValue(), 
extractor.extractWatermark(split(0)));
-    Assert.assertEquals(
-        MIN_VALUES.get(1).get(columnName).longValue(), 
extractor.extractWatermark(split(1)));
-    Assert.assertEquals(
-        Math.min(MIN_VALUES.get(0).get(columnName), 
MIN_VALUES.get(1).get(columnName)),
-        extractor.extractWatermark(combinedSplit));
+    assertThat(extractor.extractWatermark(split(0)))
+        .isEqualTo(MIN_VALUES.get(0).get(columnName).longValue());
+    assertThat(extractor.extractWatermark(split(1)))
+        .isEqualTo(MIN_VALUES.get(1).get(columnName).longValue());
+    assertThat(extractor.extractWatermark(combinedSplit))
+        .isEqualTo(Math.min(MIN_VALUES.get(0).get(columnName), 
MIN_VALUES.get(1).get(columnName)));
   }
 
-  @Test
+  @TestTemplate
   public void testWrongColumn() {
-    Assume.assumeTrue("Run only for string column", 
columnName.equals("string_column"));
+    assumeThat(columnName).isEqualTo("string_column");
     assertThatThrownBy(() -> new ColumnStatsWatermarkExtractor(SCHEMA, 
columnName, null))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessageContaining(
             "Found STRING, expected a LONG or TIMESTAMP column for watermark 
generation.");
   }
 
-  @Test
+  @TestTemplate
   public void testEmptyStatistics() throws IOException {
-    Assume.assumeTrue("Run only for timestamp column", 
columnName.equals("timestamp_column"));
+    assumeThat(columnName).isEqualTo("timestamp_column");
 
     // Create an extractor for a column we do not have statistics
     ColumnStatsWatermarkExtractor extractor =
@@ -171,7 +169,7 @@ public class TestColumnStatsWatermarkExtractor {
     return IcebergSourceSplit.fromCombinedScanTask(
         ReaderUtil.createCombinedScanTask(
             ImmutableList.of(TEST_RECORDS.get(id)),
-            TEMPORARY_FOLDER,
+            temporaryFolder,
             FileFormat.PARQUET,
             APPENDER_FACTORY));
   }

Reply via email to