This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 581e03713c Flink 1.18: Create JUnit5 version of TestFlinkScan (#9480)
581e03713c is described below

commit 581e03713c2b397c2416899fdf1e433540aca3d3
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Jan 16 11:47:12 2024 +0100

    Flink 1.18: Create JUnit5 version of TestFlinkScan (#9480)
---
 .../java/org/apache/iceberg/flink/TestHelpers.java | 385 ++++++++++-----------
 .../iceberg/flink/source/TestFlinkInputFormat.java |  30 +-
 .../apache/iceberg/flink/source/TestFlinkScan.java | 152 ++++----
 .../iceberg/flink/source/TestFlinkScanSql.java     |  11 +-
 .../iceberg/flink/source/TestFlinkSource.java      |   6 +-
 .../flink/source/TestIcebergSourceBounded.java     |   9 +-
 .../flink/source/TestIcebergSourceBoundedSql.java  |  10 +-
 7 files changed, 279 insertions(+), 324 deletions(-)

diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 38b0eb0b40..80e5ddd24f 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -67,7 +69,6 @@ import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
 import org.assertj.core.api.Assertions;
-import org.junit.Assert;
 
 public class TestHelpers {
   private TestHelpers() {}
@@ -154,7 +155,7 @@ public class TestHelpers {
   }
 
   public static void assertRows(List<Row> results, List<Row> expected) {
-    
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
   }
 
   public static void assertRowsWithOrder(List<Row> results, List<Row> 
expected) {
@@ -174,9 +175,8 @@ public class TestHelpers {
       return;
     }
 
-    Assert.assertTrue(
-        "expected Record and actual RowData should be both null or not null",
-        expectedRecord != null && actualRowData != null);
+    assertThat(expectedRecord).isNotNull();
+    assertThat(actualRowData).isNotNull();
 
     List<Type> types = Lists.newArrayList();
     for (Types.NestedField field : structType.fields()) {
@@ -208,109 +208,106 @@ public class TestHelpers {
       return;
     }
 
-    Assert.assertTrue(
-        "expected and actual should be both null or not null", expected != 
null && actual != null);
+    assertThat(expected).isNotNull();
+    assertThat(actual).isNotNull();
 
     switch (type.typeId()) {
       case BOOLEAN:
-        Assert.assertEquals("boolean value should be equal", expected, actual);
+        assertThat(actual).as("boolean value should be 
equal").isEqualTo(expected);
         break;
       case INTEGER:
-        Assert.assertEquals("int value should be equal", expected, actual);
+        assertThat(actual).as("int value should be equal").isEqualTo(expected);
         break;
       case LONG:
-        Assert.assertEquals("long value should be equal", expected, actual);
+        assertThat(actual).as("long value should be 
equal").isEqualTo(expected);
         break;
       case FLOAT:
-        Assert.assertEquals("float value should be equal", expected, actual);
+        assertThat(actual).as("float value should be 
equal").isEqualTo(expected);
         break;
       case DOUBLE:
-        Assert.assertEquals("double value should be equal", expected, actual);
+        assertThat(actual).as("double value should be 
equal").isEqualTo(expected);
         break;
       case STRING:
-        Assertions.assertThat(expected)
-            .as("Should expect a CharSequence")
-            .isInstanceOf(CharSequence.class);
-        Assert.assertEquals("string should be equal", 
String.valueOf(expected), actual.toString());
+        assertThat(expected).as("Should expect a 
CharSequence").isInstanceOf(CharSequence.class);
+        assertThat(actual.toString())
+            .as("string should be equal")
+            .isEqualTo(String.valueOf(expected));
         break;
       case DATE:
-        Assertions.assertThat(expected).as("Should expect a 
Date").isInstanceOf(LocalDate.class);
+        assertThat(expected).as("Should expect a 
Date").isInstanceOf(LocalDate.class);
         LocalDate date = DateTimeUtil.dateFromDays((int) actual);
-        Assert.assertEquals("date should be equal", expected, date);
+        assertThat(date).as("date should be equal").isEqualTo(expected);
         break;
       case TIME:
-        Assertions.assertThat(expected)
-            .as("Should expect a LocalTime")
-            .isInstanceOf(LocalTime.class);
+        assertThat(expected).as("Should expect a 
LocalTime").isInstanceOf(LocalTime.class);
         int milliseconds = (int) (((LocalTime) expected).toNanoOfDay() / 
1000_000);
-        Assert.assertEquals("time millis should be equal", milliseconds, 
actual);
+        assertThat(actual).as("time millis should be 
equal").isEqualTo(milliseconds);
         break;
       case TIMESTAMP:
         if (((Types.TimestampType) type).shouldAdjustToUTC()) {
-          Assertions.assertThat(expected)
+          assertThat(expected)
               .as("Should expect a OffsetDataTime")
               .isInstanceOf(OffsetDateTime.class);
           OffsetDateTime ts = (OffsetDateTime) expected;
-          Assert.assertEquals(
-              "OffsetDataTime should be equal",
-              ts.toLocalDateTime(),
-              ((TimestampData) actual).toLocalDateTime());
+          assertThat(((TimestampData) actual).toLocalDateTime())
+              .as("OffsetDataTime should be equal")
+              .isEqualTo(ts.toLocalDateTime());
         } else {
-          Assertions.assertThat(expected)
+          assertThat(expected)
               .as("Should expect a LocalDataTime")
               .isInstanceOf(LocalDateTime.class);
           LocalDateTime ts = (LocalDateTime) expected;
-          Assert.assertEquals(
-              "LocalDataTime should be equal", ts, ((TimestampData) 
actual).toLocalDateTime());
+          assertThat(((TimestampData) actual).toLocalDateTime())
+              .as("LocalDataTime should be equal")
+              .isEqualTo(ts);
         }
         break;
       case BINARY:
-        Assertions.assertThat(expected)
+        assertThat(ByteBuffer.wrap((byte[]) actual))
             .as("Should expect a ByteBuffer")
-            .isInstanceOf(ByteBuffer.class);
-        Assert.assertEquals("binary should be equal", expected, 
ByteBuffer.wrap((byte[]) actual));
+            .isInstanceOf(ByteBuffer.class)
+            .isEqualTo(expected);
         break;
       case DECIMAL:
-        Assertions.assertThat(expected)
-            .as("Should expect a BigDecimal")
-            .isInstanceOf(BigDecimal.class);
+        assertThat(expected).as("Should expect a 
BigDecimal").isInstanceOf(BigDecimal.class);
         BigDecimal bd = (BigDecimal) expected;
-        Assert.assertEquals(
-            "decimal value should be equal", bd, ((DecimalData) 
actual).toBigDecimal());
+        assertThat(((DecimalData) actual).toBigDecimal())
+            .as("decimal value should be equal")
+            .isEqualTo(bd);
         break;
       case LIST:
-        Assertions.assertThat(expected)
-            .as("Should expect a Collection")
-            .isInstanceOf(Collection.class);
+        assertThat(expected).as("Should expect a 
Collection").isInstanceOf(Collection.class);
         Collection<?> expectedArrayData = (Collection<?>) expected;
         ArrayData actualArrayData = (ArrayData) actual;
         LogicalType elementType = ((ArrayType) logicalType).getElementType();
-        Assert.assertEquals(
-            "array length should be equal", expectedArrayData.size(), 
actualArrayData.size());
+        assertThat(actualArrayData.size())
+            .as("array length should be equal")
+            .isEqualTo(expectedArrayData.size());
         assertArrayValues(
             type.asListType().elementType(), elementType, expectedArrayData, 
actualArrayData);
         break;
       case MAP:
-        Assertions.assertThat(expected).as("Should expect a 
Map").isInstanceOf(Map.class);
+        assertThat(expected).as("Should expect a Map").isInstanceOf(Map.class);
         assertMapValues(type.asMapType(), logicalType, (Map<?, ?>) expected, 
(MapData) actual);
         break;
       case STRUCT:
-        Assertions.assertThat(expected).as("Should expect a 
Record").isInstanceOf(StructLike.class);
+        assertThat(expected).as("Should expect a 
Record").isInstanceOf(StructLike.class);
         assertRowData(type.asStructType(), logicalType, (StructLike) expected, 
(RowData) actual);
         break;
       case UUID:
-        Assertions.assertThat(expected).as("Should expect a 
UUID").isInstanceOf(UUID.class);
+        assertThat(expected).as("Should expect a 
UUID").isInstanceOf(UUID.class);
         ByteBuffer bb = ByteBuffer.wrap((byte[]) actual);
         long firstLong = bb.getLong();
         long secondLong = bb.getLong();
-        Assert.assertEquals(
-            "UUID should be equal",
-            expected.toString(),
-            new UUID(firstLong, secondLong).toString());
+        assertThat(new UUID(firstLong, secondLong).toString())
+            .as("UUID should be equal")
+            .isEqualTo(expected.toString());
         break;
       case FIXED:
-        Assertions.assertThat(expected).as("Should expect 
byte[]").isInstanceOf(byte[].class);
-        Assert.assertArrayEquals("binary should be equal", (byte[]) expected, 
(byte[]) actual);
+        assertThat(actual)
+            .as("Should expect byte[]")
+            .isInstanceOf(byte[].class)
+            .isEqualTo(expected);
         break;
       default:
         throw new IllegalArgumentException("Not a supported type: " + type);
@@ -324,8 +321,9 @@ public class TestHelpers {
 
   public static void assertEquals(Schema schema, GenericData.Record record, 
Row row) {
     List<Types.NestedField> fields = schema.asStruct().fields();
-    Assert.assertEquals(fields.size(), record.getSchema().getFields().size());
-    Assert.assertEquals(fields.size(), row.getArity());
+    assertThat(fields).hasSameSizeAs(record.getSchema().getFields());
+    assertThat(fields).hasSize(row.getArity());
+
     RowType rowType = FlinkSchemaUtil.convert(schema);
     for (int i = 0; i < fields.size(); ++i) {
       Type fieldType = fields.get(i).type();
@@ -352,9 +350,8 @@ public class TestHelpers {
     if (expected == null && actual == null) {
       return;
     }
-
-    Assert.assertTrue(
-        "expected and actual should be both null or not null", expected != 
null && actual != null);
+    assertThat(expected).isNotNull();
+    assertThat(actual).isNotNull();
 
     switch (type.typeId()) {
       case BOOLEAN:
@@ -362,87 +359,79 @@ public class TestHelpers {
       case LONG:
       case FLOAT:
       case DOUBLE:
-        Assertions.assertThat(expected)
+        assertThat(expected)
             .as("Should expect a " + type.typeId().javaClass())
             .isInstanceOf(type.typeId().javaClass());
-        Assertions.assertThat(actual)
+        assertThat(actual)
             .as("Should expect a " + type.typeId().javaClass())
             .isInstanceOf(type.typeId().javaClass());
-        Assert.assertEquals(type.typeId() + " value should be equal", 
expected, actual);
+        assertThat(actual).as(type.typeId() + " value should be 
equal").isEqualTo(expected);
         break;
       case STRING:
-        Assertions.assertThat(expected)
-            .as("Should expect a CharSequence")
-            .isInstanceOf(CharSequence.class);
-        Assertions.assertThat(actual)
-            .as("Should expect a CharSequence")
-            .isInstanceOf(CharSequence.class);
-        Assert.assertEquals("string should be equal", expected.toString(), 
actual.toString());
+        assertThat(expected).as("Should expect a 
CharSequence").isInstanceOf(CharSequence.class);
+        assertThat(actual).as("Should expect a 
CharSequence").isInstanceOf(CharSequence.class);
+        assertThat(actual.toString()).as("string should be 
equal").isEqualTo(expected.toString());
         break;
       case DATE:
-        Assertions.assertThat(expected).as("Should expect a 
Date").isInstanceOf(LocalDate.class);
+        assertThat(expected).as("Should expect a 
Date").isInstanceOf(LocalDate.class);
         LocalDate date = DateTimeUtil.dateFromDays((int) actual);
-        Assert.assertEquals("date should be equal", expected, date);
+        assertThat(date).as("date should be equal").isEqualTo(expected);
         break;
       case TIME:
-        Assertions.assertThat(expected)
-            .as("Should expect a LocalTime")
-            .isInstanceOf(LocalTime.class);
+        assertThat(expected).as("Should expect a 
LocalTime").isInstanceOf(LocalTime.class);
         int milliseconds = (int) (((LocalTime) expected).toNanoOfDay() / 
1000_000);
-        Assert.assertEquals("time millis should be equal", milliseconds, 
actual);
+        assertThat(actual).as("time millis should be 
equal").isEqualTo(milliseconds);
         break;
       case TIMESTAMP:
         if (((Types.TimestampType) type).shouldAdjustToUTC()) {
-          Assertions.assertThat(expected)
+          assertThat(expected)
               .as("Should expect a OffsetDataTime")
               .isInstanceOf(OffsetDateTime.class);
           OffsetDateTime ts = (OffsetDateTime) expected;
-          Assert.assertEquals(
-              "OffsetDataTime should be equal",
-              ts.toLocalDateTime(),
-              ((TimestampData) actual).toLocalDateTime());
+          assertThat(((TimestampData) actual).toLocalDateTime())
+              .as("OffsetDataTime should be equal")
+              .isEqualTo(ts.toLocalDateTime());
         } else {
-          Assertions.assertThat(expected)
+          assertThat(expected)
               .as("Should expect a LocalDataTime")
               .isInstanceOf(LocalDateTime.class);
           LocalDateTime ts = (LocalDateTime) expected;
-          Assert.assertEquals(
-              "LocalDataTime should be equal", ts, ((TimestampData) 
actual).toLocalDateTime());
+          assertThat(((TimestampData) actual).toLocalDateTime())
+              .as("LocalDataTime should be equal")
+              .isEqualTo(ts);
         }
         break;
       case BINARY:
-        Assertions.assertThat(expected)
+        assertThat(ByteBuffer.wrap((byte[]) actual))
             .as("Should expect a ByteBuffer")
-            .isInstanceOf(ByteBuffer.class);
-        Assert.assertEquals("binary should be equal", expected, 
ByteBuffer.wrap((byte[]) actual));
+            .isInstanceOf(ByteBuffer.class)
+            .isEqualTo(expected);
         break;
       case DECIMAL:
-        Assertions.assertThat(expected)
-            .as("Should expect a BigDecimal")
-            .isInstanceOf(BigDecimal.class);
+        assertThat(expected).as("Should expect a 
BigDecimal").isInstanceOf(BigDecimal.class);
         BigDecimal bd = (BigDecimal) expected;
-        Assert.assertEquals(
-            "decimal value should be equal", bd, ((DecimalData) 
actual).toBigDecimal());
+        assertThat(((DecimalData) actual).toBigDecimal())
+            .as("decimal value should be equal")
+            .isEqualTo(bd);
         break;
       case LIST:
-        Assertions.assertThat(expected)
-            .as("Should expect a Collection")
-            .isInstanceOf(Collection.class);
+        assertThat(expected).as("Should expect a 
Collection").isInstanceOf(Collection.class);
         Collection<?> expectedArrayData = (Collection<?>) expected;
         ArrayData actualArrayData;
         try {
-          actualArrayData = convertToArray(actual);
+          actualArrayData = (ArrayData) actual;
         } catch (ClassCastException e) {
           actualArrayData = new GenericArrayData((Object[]) actual);
         }
         LogicalType elementType = ((ArrayType) logicalType).getElementType();
-        Assert.assertEquals(
-            "array length should be equal", expectedArrayData.size(), 
actualArrayData.size());
+        assertThat(actualArrayData.size())
+            .as("array length should be equal")
+            .isEqualTo(expectedArrayData.size());
         assertArrayValues(
             type.asListType().elementType(), elementType, expectedArrayData, 
actualArrayData);
         break;
       case MAP:
-        Assertions.assertThat(expected).as("Should expect a 
Map").isInstanceOf(Map.class);
+        assertThat(expected).as("Should expect a Map").isInstanceOf(Map.class);
         MapData actualMap;
         try {
           actualMap = (MapData) actual;
@@ -452,60 +441,36 @@ public class TestHelpers {
         assertMapValues(type.asMapType(), logicalType, (Map<?, ?>) expected, 
actualMap);
         break;
       case STRUCT:
-        Assertions.assertThat(expected)
-            .as("Should expect a Record")
-            .isInstanceOf(GenericData.Record.class);
+        assertThat(expected).as("Should expect a 
Record").isInstanceOf(GenericData.Record.class);
         assertEquals(
             type.asNestedType().asStructType(), (GenericData.Record) expected, 
(Row) actual);
         break;
       case UUID:
-        Assertions.assertThat(expected).as("Should expect a 
UUID").isInstanceOf(UUID.class);
+        assertThat(expected).as("Should expect a 
UUID").isInstanceOf(UUID.class);
         ByteBuffer bb = ByteBuffer.wrap((byte[]) actual);
         long firstLong = bb.getLong();
         long secondLong = bb.getLong();
-        Assert.assertEquals(
-            "UUID should be equal",
-            expected.toString(),
-            new UUID(firstLong, secondLong).toString());
+        assertThat(new UUID(firstLong, secondLong).toString())
+            .as("UUID should be equal")
+            .isEqualTo(expected.toString());
         break;
       case FIXED:
-        Assertions.assertThat(expected).as("Should expect 
byte[]").isInstanceOf(byte[].class);
-        Assert.assertArrayEquals("binary should be equal", (byte[]) expected, 
(byte[]) actual);
+        assertThat(actual)
+            .as("Should expect byte[]")
+            .isInstanceOf(byte[].class)
+            .isEqualTo(expected);
         break;
       default:
         throw new IllegalArgumentException("Not a supported type: " + type);
     }
   }
 
-  private static GenericArrayData convertToArray(Object actual) {
-    Class<?> arrayClass = actual.getClass();
-    if (Object[].class.equals(arrayClass)) {
-      return new GenericArrayData((Object[]) actual);
-    } else if (int[].class.equals(arrayClass)) {
-      return new GenericArrayData((int[]) actual);
-    } else if (long[].class.equals(arrayClass)) {
-      return new GenericArrayData((long[]) actual);
-    } else if (float[].class.equals(arrayClass)) {
-      return new GenericArrayData((float[]) actual);
-    } else if (double[].class.equals(arrayClass)) {
-      return new GenericArrayData((double[]) actual);
-    } else if (short[].class.equals(arrayClass)) {
-      return new GenericArrayData((short[]) actual);
-    } else if (byte[].class.equals(arrayClass)) {
-      return new GenericArrayData((byte[]) actual);
-    } else if (boolean[].class.equals(arrayClass)) {
-      return new GenericArrayData((boolean[]) actual);
-    } else {
-      throw new IllegalArgumentException("Unsupported type " + arrayClass);
-    }
-  }
-
   private static void assertArrayValues(
       Type type, LogicalType logicalType, Collection<?> expectedArray, 
ArrayData actualArray) {
     List<?> expectedElements = Lists.newArrayList(expectedArray);
     for (int i = 0; i < expectedArray.size(); i += 1) {
       if (expectedElements.get(i) == null) {
-        Assert.assertTrue(actualArray.isNullAt(i));
+        assertThat(actualArray.isNullAt(i)).isTrue();
         continue;
       }
 
@@ -521,7 +486,7 @@ public class TestHelpers {
 
   private static void assertMapValues(
       Types.MapType mapType, LogicalType type, Map<?, ?> expected, MapData 
actual) {
-    Assert.assertEquals("map size should be equal", expected.size(), 
actual.size());
+    assertThat(actual.size()).as("map size should be 
equal").isEqualTo(expected.size());
 
     ArrayData actualKeyArrayData = actual.keyArray();
     ArrayData actualValueArrayData = actual.valueArray();
@@ -547,7 +512,7 @@ public class TestHelpers {
           // not found
         }
       }
-      Assert.assertNotNull("Should have a matching key", matchedActualKey);
+      assertThat(matchedActualKey).as("Should have a matching 
key").isNotNull();
       final int valueIndex = matchedKeyIndex;
       assertEquals(
           valueType,
@@ -561,60 +526,67 @@ public class TestHelpers {
     if (expected == actual) {
       return;
     }
-    Assert.assertTrue("Should not be null.", expected != null && actual != 
null);
-    Assert.assertEquals("Path must match", expected.path(), actual.path());
-    Assert.assertEquals("Length must match", expected.length(), 
actual.length());
-    Assert.assertEquals("Spec id must match", expected.partitionSpecId(), 
actual.partitionSpecId());
-    Assert.assertEquals("ManifestContent must match", expected.content(), 
actual.content());
-    Assert.assertEquals(
-        "SequenceNumber must match", expected.sequenceNumber(), 
actual.sequenceNumber());
-    Assert.assertEquals(
-        "MinSequenceNumber must match", expected.minSequenceNumber(), 
actual.minSequenceNumber());
-    Assert.assertEquals("Snapshot id must match", expected.snapshotId(), 
actual.snapshotId());
-    Assert.assertEquals(
-        "Added files flag must match", expected.hasAddedFiles(), 
actual.hasAddedFiles());
-    Assert.assertEquals(
-        "Added files count must match", expected.addedFilesCount(), 
actual.addedFilesCount());
-    Assert.assertEquals(
-        "Added rows count must match", expected.addedRowsCount(), 
actual.addedRowsCount());
-    Assert.assertEquals(
-        "Existing files flag must match", expected.hasExistingFiles(), 
actual.hasExistingFiles());
-    Assert.assertEquals(
-        "Existing files count must match",
-        expected.existingFilesCount(),
-        actual.existingFilesCount());
-    Assert.assertEquals(
-        "Existing rows count must match", expected.existingRowsCount(), 
actual.existingRowsCount());
-    Assert.assertEquals(
-        "Deleted files flag must match", expected.hasDeletedFiles(), 
actual.hasDeletedFiles());
-    Assert.assertEquals(
-        "Deleted files count must match", expected.deletedFilesCount(), 
actual.deletedFilesCount());
-    Assert.assertEquals(
-        "Deleted rows count must match", expected.deletedRowsCount(), 
actual.deletedRowsCount());
+    assertThat(expected).isNotNull();
+    assertThat(actual).isNotNull();
+    assertThat(actual.path()).as("Path must match").isEqualTo(expected.path());
+    assertThat(actual.length()).as("Length must 
match").isEqualTo(expected.length());
+    assertThat(actual.partitionSpecId())
+        .as("Spec id must match")
+        .isEqualTo(expected.partitionSpecId());
+    assertThat(actual.content()).as("ManifestContent must 
match").isEqualTo(expected.content());
+    assertThat(actual.sequenceNumber())
+        .as("SequenceNumber must match")
+        .isEqualTo(expected.sequenceNumber());
+    assertThat(actual.minSequenceNumber())
+        .as("MinSequenceNumber must match")
+        .isEqualTo(expected.minSequenceNumber());
+    assertThat(actual.snapshotId()).as("Snapshot id must 
match").isEqualTo(expected.snapshotId());
+    assertThat(actual.hasAddedFiles())
+        .as("Added files flag must match")
+        .isEqualTo(expected.hasAddedFiles());
+    assertThat(actual.addedFilesCount())
+        .as("Added files count must match")
+        .isEqualTo(expected.addedFilesCount());
+    assertThat(actual.addedRowsCount())
+        .as("Added rows count must match")
+        .isEqualTo(expected.addedRowsCount());
+    assertThat(actual.hasExistingFiles())
+        .as("Existing files flag must match")
+        .isEqualTo(expected.hasExistingFiles());
+    assertThat(actual.existingFilesCount())
+        .as("Existing files count must match")
+        .isEqualTo(expected.existingFilesCount());
+    assertThat(actual.existingRowsCount())
+        .as("Existing rows count must match")
+        .isEqualTo(expected.existingRowsCount());
+    assertThat(actual.hasDeletedFiles())
+        .as("Deleted files flag must match")
+        .isEqualTo(expected.hasDeletedFiles());
+    assertThat(actual.deletedFilesCount())
+        .as("Deleted files count must match")
+        .isEqualTo(expected.deletedFilesCount());
+    assertThat(actual.deletedRowsCount())
+        .as("Deleted rows count must match")
+        .isEqualTo(expected.deletedRowsCount());
 
     List<ManifestFile.PartitionFieldSummary> expectedSummaries = 
expected.partitions();
     List<ManifestFile.PartitionFieldSummary> actualSummaries = 
actual.partitions();
-    Assert.assertEquals(
-        "PartitionFieldSummary size does not match",
-        expectedSummaries.size(),
-        actualSummaries.size());
+    assertThat(actualSummaries)
+        .as("PartitionFieldSummary size does not match")
+        .hasSameSizeAs(expectedSummaries);
     for (int i = 0; i < expectedSummaries.size(); i++) {
-      Assert.assertEquals(
-          "Null flag in partition must match",
-          expectedSummaries.get(i).containsNull(),
-          actualSummaries.get(i).containsNull());
-      Assert.assertEquals(
-          "NaN flag in partition must match",
-          expectedSummaries.get(i).containsNaN(),
-          actualSummaries.get(i).containsNaN());
-      Assert.assertEquals(
-          "Lower bounds in partition must match",
-          expectedSummaries.get(i).lowerBound(),
-          actualSummaries.get(i).lowerBound());
-      Assert.assertEquals(
-          "Upper bounds in partition must match",
-          expectedSummaries.get(i).upperBound(),
-          actualSummaries.get(i).upperBound());
+      assertThat(actualSummaries.get(i).containsNull())
+          .as("Null flag in partition must match")
+          .isEqualTo(expectedSummaries.get(i).containsNull());
+      assertThat(actualSummaries.get(i).containsNaN())
+          .as("NaN flag in partition must match")
+          .isEqualTo(expectedSummaries.get(i).containsNaN());
+      assertThat(actualSummaries.get(i).lowerBound())
+          .as("Lower bounds in partition must match")
+          .isEqualTo(expectedSummaries.get(i).lowerBound());
+      assertThat(actualSummaries.get(i).upperBound())
+          .as("Upper bounds in partition must match")
+          .isEqualTo(expectedSummaries.get(i).upperBound());
     }
   }
 
@@ -622,28 +594,35 @@ public class TestHelpers {
     if (expected == actual) {
       return;
     }
-    Assert.assertTrue("Shouldn't be null.", expected != null && actual != 
null);
-    Assert.assertEquals("SpecId", expected.specId(), actual.specId());
-    Assert.assertEquals("Content", expected.content(), actual.content());
-    Assert.assertEquals("Path", expected.path(), actual.path());
-    Assert.assertEquals("Format", expected.format(), actual.format());
-    Assert.assertEquals("Partition size", expected.partition().size(), 
actual.partition().size());
+    assertThat(expected).isNotNull();
+    assertThat(actual).isNotNull();
+    assertThat(actual.specId()).as("SpecId").isEqualTo(expected.specId());
+    assertThat(actual.content()).as("Content").isEqualTo(expected.content());
+    assertThat(actual.path()).as("Path").isEqualTo(expected.path());
+    assertThat(actual.format()).as("Format").isEqualTo(expected.format());
+    assertThat(actual.partition().size())
+        .as("Partition size")
+        .isEqualTo(expected.partition().size());
     for (int i = 0; i < expected.partition().size(); i++) {
-      Assert.assertEquals(
-          "Partition data at index " + i,
-          expected.partition().get(i, Object.class),
-          actual.partition().get(i, Object.class));
+      assertThat(actual.partition().get(i, Object.class))
+          .as("Partition data at index " + i)
+          .isEqualTo(expected.partition().get(i, Object.class));
     }
-    Assert.assertEquals("Record count", expected.recordCount(), 
actual.recordCount());
-    Assert.assertEquals("File size in bytes", expected.fileSizeInBytes(), 
actual.fileSizeInBytes());
-    Assert.assertEquals("Column sizes", expected.columnSizes(), 
actual.columnSizes());
-    Assert.assertEquals("Value counts", expected.valueCounts(), 
actual.valueCounts());
-    Assert.assertEquals("Null value counts", expected.nullValueCounts(), 
actual.nullValueCounts());
-    Assert.assertEquals("Lower bounds", expected.lowerBounds(), 
actual.lowerBounds());
-    Assert.assertEquals("Upper bounds", expected.upperBounds(), 
actual.upperBounds());
-    Assert.assertEquals("Key metadata", expected.keyMetadata(), 
actual.keyMetadata());
-    Assert.assertEquals("Split offsets", expected.splitOffsets(), 
actual.splitOffsets());
-    Assert.assertEquals(
-        "Equality field id list", actual.equalityFieldIds(), 
expected.equalityFieldIds());
+    assertThat(actual.recordCount()).as("Record 
count").isEqualTo(expected.recordCount());
+    assertThat(actual.fileSizeInBytes())
+        .as("File size in bytes")
+        .isEqualTo(expected.fileSizeInBytes());
+    assertThat(actual.columnSizes()).as("Column 
sizes").isEqualTo(expected.columnSizes());
+    assertThat(actual.valueCounts()).as("Value 
counts").isEqualTo(expected.valueCounts());
+    assertThat(actual.nullValueCounts())
+        .as("Null value counts")
+        .isEqualTo(expected.nullValueCounts());
+    assertThat(actual.lowerBounds()).as("Lower 
bounds").isEqualTo(expected.lowerBounds());
+    assertThat(actual.upperBounds()).as("Upper 
bounds").isEqualTo(expected.upperBounds());
+    assertThat(actual.keyMetadata()).as("Key 
metadata").isEqualTo(expected.keyMetadata());
+    assertThat(actual.splitOffsets()).as("Split 
offsets").isEqualTo(expected.splitOffsets());
+    assertThat(actual.equalityFieldIds())
+        .as("Equality field id list")
+        .isEqualTo(expected.equalityFieldIds());
   }
 }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
index 73d03710d3..ed3f54bec6 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.source;
 
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -41,16 +42,11 @@ import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
-import org.junit.Assume;
-import org.junit.Test;
+import org.junit.jupiter.api.TestTemplate;
 
 /** Test {@link FlinkInputFormat}. */
 public class TestFlinkInputFormat extends TestFlinkSource {
 
-  public TestFlinkInputFormat(String fileFormat) {
-    super(fileFormat);
-  }
-
   @Override
   protected List<Row> run(
       FlinkSource.Builder formatBuilder,
@@ -61,7 +57,7 @@ public class TestFlinkInputFormat extends TestFlinkSource {
     return runFormat(formatBuilder.tableLoader(tableLoader()).buildFormat());
   }
 
-  @Test
+  @TestTemplate
   public void testNestedProjection() throws Exception {
     Schema schema =
         new Schema(
@@ -75,10 +71,11 @@ public class TestFlinkInputFormat extends TestFlinkSource {
                     Types.NestedField.required(5, "f3", 
Types.LongType.get()))),
             required(6, "id", Types.LongType.get()));
 
-    Table table = 
catalogResource.catalog().createTable(TableIdentifier.of("default", "t"), 
schema);
+    Table table =
+        catalogExtension.catalog().createTable(TableIdentifier.of("default", 
"t"), schema);
 
     List<Record> writeRecords = RandomGenericData.generate(schema, 2, 0L);
-    new GenericAppenderHelper(table, fileFormat, 
TEMPORARY_FOLDER).appendToTable(writeRecords);
+    new GenericAppenderHelper(table, fileFormat, 
temporaryDirectory).appendToTable(writeRecords);
 
     // Schema: [data, nested[f1, f2, f3], id]
     // Projection: [nested.f2, data]
@@ -106,7 +103,7 @@ public class TestFlinkInputFormat extends TestFlinkSource {
     TestHelpers.assertRows(result, expected);
   }
 
-  @Test
+  @TestTemplate
   public void testBasicProjection() throws IOException {
     Schema writeSchema =
         new Schema(
@@ -115,10 +112,10 @@ public class TestFlinkInputFormat extends TestFlinkSource 
{
             Types.NestedField.optional(2, "time", 
Types.TimestampType.withZone()));
 
     Table table =
-        catalogResource.catalog().createTable(TableIdentifier.of("default", 
"t"), writeSchema);
+        catalogExtension.catalog().createTable(TableIdentifier.of("default", 
"t"), writeSchema);
 
     List<Record> writeRecords = RandomGenericData.generate(writeSchema, 2, 0L);
-    new GenericAppenderHelper(table, fileFormat, 
TEMPORARY_FOLDER).appendToTable(writeRecords);
+    new GenericAppenderHelper(table, fileFormat, 
temporaryDirectory).appendToTable(writeRecords);
 
     TableSchema projectedSchema =
         TableSchema.builder()
@@ -140,9 +137,9 @@ public class TestFlinkInputFormat extends TestFlinkSource {
     TestHelpers.assertRows(result, expected);
   }
 
-  @Test
+  @TestTemplate
   public void testReadPartitionColumn() throws Exception {
-    Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);
+    assumeThat(fileFormat).as("Temporary skip 
ORC").isNotEqualTo(FileFormat.ORC);
 
     Schema nestedSchema =
         new Schema(
@@ -157,9 +154,10 @@ public class TestFlinkInputFormat extends TestFlinkSource {
         
PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
 
     Table table =
-        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
nestedSchema, spec);
+        catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
nestedSchema, spec);
     List<Record> records = RandomGenericData.generate(nestedSchema, 10, 0L);
-    GenericAppenderHelper appender = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    GenericAppenderHelper appender =
+        new GenericAppenderHelper(table, fileFormat, temporaryDirectory);
     for (Record record : records) {
       org.apache.iceberg.TestHelpers.Row partition =
           org.apache.iceberg.TestHelpers.Row.of(record.get(1, 
Record.class).get(1));
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index b537efa727..428da49f1d 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -18,18 +18,26 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -38,8 +46,8 @@ import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.flink.HadoopCatalogResource;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.TestHelpers;
@@ -48,42 +56,32 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
-import org.assertj.core.api.Assertions;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public abstract class TestFlinkScan {
-
-  @ClassRule
-  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+@ExtendWith(ParameterizedTestExtension.class)
+public abstract class TestFlinkScan {
+  @RegisterExtension
+  protected static MiniClusterExtension miniClusterResource =
+      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
 
-  @Rule
-  public final HadoopCatalogResource catalogResource =
-      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE);
+  @TempDir protected Path temporaryDirectory;
 
-  // parametrized variables
-  protected final FileFormat fileFormat;
+  @RegisterExtension
+  protected static final HadoopCatalogExtension catalogExtension =
+      new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
 
-  @Parameterized.Parameters(name = "format={0}")
-  public static Object[] parameters() {
-    return new Object[] {"avro", "parquet", "orc"};
-  }
+  @Parameter protected FileFormat fileFormat;
 
-  TestFlinkScan(String fileFormat) {
-    this.fileFormat = FileFormat.fromString(fileFormat);
+  @Parameters(name = "format={0}")
+  public static Collection<FileFormat> fileFormat() {
+    return Arrays.asList(FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC);
   }
 
   protected TableLoader tableLoader() {
-    return catalogResource.tableLoader();
+    return catalogExtension.tableLoader();
   }
 
   protected abstract List<Row> runWithProjection(String... projected) throws 
Exception;
@@ -99,41 +97,41 @@ public abstract class TestFlinkScan {
 
   protected abstract List<Row> run() throws Exception;
 
-  @Test
+  @TestTemplate
   public void testUnpartitionedTable() throws Exception {
     Table table =
-        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+        catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
     List<Record> expectedRecords = 
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
-    new GenericAppenderHelper(table, fileFormat, 
TEMPORARY_FOLDER).appendToTable(expectedRecords);
+    new GenericAppenderHelper(table, fileFormat, 
temporaryDirectory).appendToTable(expectedRecords);
     TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
   }
 
-  @Test
+  @TestTemplate
   public void testPartitionedTable() throws Exception {
     Table table =
-        catalogResource
+        catalogExtension
             .catalog()
             .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, 
TestFixtures.SPEC);
     List<Record> expectedRecords = 
RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
     expectedRecords.get(0).set(2, "2020-03-20");
-    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER)
+    new GenericAppenderHelper(table, fileFormat, temporaryDirectory)
         .appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), 
expectedRecords);
     TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
   }
 
-  @Test
+  @TestTemplate
   public void testProjection() throws Exception {
     Table table =
-        catalogResource
+        catalogExtension
             .catalog()
             .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, 
TestFixtures.SPEC);
     List<Record> inputRecords = 
RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
-    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER)
+    new GenericAppenderHelper(table, fileFormat, temporaryDirectory)
         .appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), 
inputRecords);
     assertRows(runWithProjection("data"), Row.of(inputRecords.get(0).get(0)));
   }
 
-  @Test
+  @TestTemplate
   public void testIdentityPartitionProjections() throws Exception {
     Schema logSchema =
         new Schema(
@@ -145,7 +143,7 @@ public abstract class TestFlinkScan {
         
PartitionSpec.builderFor(logSchema).identity("dt").identity("level").build();
 
     Table table =
-        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
logSchema, spec);
+        catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
logSchema, spec);
     List<Record> inputRecords = RandomGenericData.generate(logSchema, 10, 0L);
 
     int idx = 0;
@@ -154,7 +152,7 @@ public abstract class TestFlinkScan {
       record.set(1, "2020-03-2" + idx);
       record.set(2, Integer.toString(idx));
       append.appendFile(
-          new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER)
+          new GenericAppenderHelper(table, fileFormat, temporaryDirectory)
               .writeFile(
                   org.apache.iceberg.TestHelpers.Row.of("2020-03-2" + idx, 
Integer.toString(idx)),
                   ImmutableList.of(record)));
@@ -200,20 +198,19 @@ public abstract class TestFlinkScan {
 
       for (int i = 0; i < projectedFields.size(); i++) {
         String name = projectedFields.get(i);
-        Assert.assertEquals(
-            "Projected field " + name + " should match",
-            inputRecord.getField(name),
-            actualRecord.getField(i));
+        assertThat(inputRecord.getField(name))
+            .as("Projected field " + name + " should match")
+            .isEqualTo(actualRecord.getField(i));
       }
     }
   }
 
-  @Test
+  @TestTemplate
   public void testSnapshotReads() throws Exception {
     Table table =
-        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+        catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
 
-    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, temporaryDirectory);
 
     List<Record> expectedRecords = 
RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
     helper.appendToTable(expectedRecords);
@@ -235,12 +232,12 @@ public abstract class TestFlinkScan {
         TestFixtures.SCHEMA);
   }
 
-  @Test
+  @TestTemplate
   public void testTagReads() throws Exception {
     Table table =
-        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+        catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
 
-    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, temporaryDirectory);
 
     List<Record> expectedRecords1 = 
RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
     helper.appendToTable(expectedRecords1);
@@ -264,12 +261,12 @@ public abstract class TestFlinkScan {
         runWithOptions(ImmutableMap.of("tag", "t1")), expectedRecords, 
TestFixtures.SCHEMA);
   }
 
-  @Test
+  @TestTemplate
   public void testBranchReads() throws Exception {
     Table table =
-        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+        catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
 
-    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, temporaryDirectory);
 
     List<Record> expectedRecordsBase = 
RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
     helper.appendToTable(expectedRecordsBase);
@@ -300,12 +297,12 @@ public abstract class TestFlinkScan {
     TestHelpers.assertRecords(run(), mainExpectedRecords, TestFixtures.SCHEMA);
   }
 
-  @Test
+  @TestTemplate
   public void testIncrementalReadViaTag() throws Exception {
     Table table =
-        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+        catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
 
-    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, temporaryDirectory);
 
     List<Record> records1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 
0L);
     helper.appendToTable(records1);
@@ -355,7 +352,7 @@ public abstract class TestFlinkScan {
         expected,
         TestFixtures.SCHEMA);
 
-    Assertions.assertThatThrownBy(
+    assertThatThrownBy(
             () ->
                 runWithOptions(
                     ImmutableMap.<String, String>builder()
@@ -366,7 +363,7 @@ public abstract class TestFlinkScan {
         .isInstanceOf(Exception.class)
         .hasMessage("START_SNAPSHOT_ID and START_TAG cannot both be set.");
 
-    Assertions.assertThatThrownBy(
+    assertThatThrownBy(
             () ->
                 runWithOptions(
                     ImmutableMap.<String, String>builder()
@@ -378,12 +375,12 @@ public abstract class TestFlinkScan {
         .hasMessage("END_SNAPSHOT_ID and END_TAG cannot both be set.");
   }
 
-  @Test
+  @TestTemplate
   public void testIncrementalRead() throws Exception {
     Table table =
-        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+        catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
 
-    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, temporaryDirectory);
 
     List<Record> records1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 
0L);
     helper.appendToTable(records1);
@@ -413,10 +410,10 @@ public abstract class TestFlinkScan {
         TestFixtures.SCHEMA);
   }
 
-  @Test
+  @TestTemplate
   public void testFilterExpPartition() throws Exception {
     Table table =
-        catalogResource
+        catalogExtension
             .catalog()
             .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, 
TestFixtures.SPEC);
 
@@ -424,7 +421,7 @@ public abstract class TestFlinkScan {
     expectedRecords.get(0).set(2, "2020-03-20");
     expectedRecords.get(1).set(2, "2020-03-20");
 
-    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, temporaryDirectory);
     DataFile dataFile1 =
         helper.writeFile(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 
0), expectedRecords);
     DataFile dataFile2 =
@@ -441,14 +438,14 @@ public abstract class TestFlinkScan {
   private void testFilterExp(Expression filter, String sqlFilter, boolean 
caseSensitive)
       throws Exception {
     Table table =
-        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+        catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
 
     List<Record> expectedRecords = 
RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
     expectedRecords.get(0).set(0, "a");
     expectedRecords.get(1).set(0, "b");
     expectedRecords.get(2).set(0, "c");
 
-    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, temporaryDirectory);
     DataFile dataFile = helper.writeFile(expectedRecords);
     helper.appendToTable(dataFile);
 
@@ -458,19 +455,19 @@ public abstract class TestFlinkScan {
     TestHelpers.assertRecords(actual, expectedRecords.subList(1, 3), 
TestFixtures.SCHEMA);
   }
 
-  @Test
+  @TestTemplate
   public void testFilterExp() throws Exception {
     testFilterExp(Expressions.greaterThanOrEqual("data", "b"), "where 
data>='b'", true);
   }
 
-  @Test
+  @TestTemplate
   public void testFilterExpCaseInsensitive() throws Exception {
     // sqlFilter does not support case-insensitive filtering:
     // https://issues.apache.org/jira/browse/FLINK-16175
     testFilterExp(Expressions.greaterThanOrEqual("DATA", "b"), "where 
data>='b'", false);
   }
 
-  @Test
+  @TestTemplate
   public void testPartitionTypes() throws Exception {
     Schema typesSchema =
         new Schema(
@@ -492,9 +489,10 @@ public abstract class TestFlinkScan {
             .build();
 
     Table table =
-        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
typesSchema, spec);
+        catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
typesSchema, spec);
     List<Record> records = RandomGenericData.generate(typesSchema, 10, 0L);
-    GenericAppenderHelper appender = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    GenericAppenderHelper appender =
+        new GenericAppenderHelper(table, fileFormat, temporaryDirectory);
     for (Record record : records) {
       org.apache.iceberg.TestHelpers.Row partition =
           org.apache.iceberg.TestHelpers.Row.of(
@@ -512,7 +510,7 @@ public abstract class TestFlinkScan {
     TestHelpers.assertRecords(run(), records, typesSchema);
   }
 
-  @Test
+  @TestTemplate
   public void testCustomizedFlinkDataTypes() throws Exception {
     Schema schema =
         new Schema(
@@ -522,9 +520,9 @@ public abstract class TestFlinkScan {
                 Types.MapType.ofRequired(2, 3, Types.StringType.get(), 
Types.StringType.get())),
             Types.NestedField.required(
                 4, "arr", Types.ListType.ofRequired(5, 
Types.StringType.get())));
-    Table table = 
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, schema);
+    Table table = 
catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, schema);
     List<Record> records = RandomGenericData.generate(schema, 10, 0L);
-    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, temporaryDirectory);
     helper.appendToTable(records);
     TestHelpers.assertRecords(run(), records, schema);
   }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
index 023166801b..b5bddc7767 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
@@ -25,23 +25,18 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
 
 /** Test Flink SELECT SQLs. */
 public class TestFlinkScanSql extends TestFlinkSource {
-
   private volatile TableEnvironment tEnv;
 
-  public TestFlinkScanSql(String fileFormat) {
-    super(fileFormat);
-  }
-
-  @Before
+  @BeforeEach
   public void before() throws IOException {
     SqlHelpers.sql(
         getTableEnv(),
         "create catalog iceberg_catalog with ('type'='iceberg', 
'catalog-type'='hadoop', 'warehouse'='%s')",
-        catalogResource.warehouse());
+        catalogExtension.warehouse());
     SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog");
     getTableEnv()
         .getConfig()
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
index 2b55bee6e5..86c7e8991d 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
@@ -32,17 +32,13 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
 public abstract class TestFlinkSource extends TestFlinkScan {
 
-  TestFlinkSource(String fileFormat) {
-    super(fileFormat);
-  }
-
   @Override
   protected List<Row> runWithProjection(String... projected) throws Exception {
     TableSchema.Builder builder = TableSchema.builder();
     TableSchema schema =
         FlinkSchemaUtil.toSchema(
             FlinkSchemaUtil.convert(
-                
catalogResource.catalog().loadTable(TestFixtures.TABLE_IDENTIFIER).schema()));
+                
catalogExtension.catalog().loadTable(TestFixtures.TABLE_IDENTIFIER).schema()));
     for (String field : projected) {
       TableColumn column = schema.getTableColumn(field).get();
       builder.field(column.getName(), column.getType());
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
index a80f87d648..3c0c38e111 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -42,20 +42,13 @@ import org.apache.iceberg.flink.data.RowDataToRowMapper;
 import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-@RunWith(Parameterized.class)
 public class TestIcebergSourceBounded extends TestFlinkScan {
 
-  public TestIcebergSourceBounded(String fileFormat) {
-    super(fileFormat);
-  }
-
   @Override
   protected List<Row> runWithProjection(String... projected) throws Exception {
     Schema icebergTableSchema =
-        
catalogResource.catalog().loadTable(TestFixtures.TABLE_IDENTIFIER).schema();
+        
catalogExtension.catalog().loadTable(TestFixtures.TABLE_IDENTIFIER).schema();
     TableSchema.Builder builder = TableSchema.builder();
     TableSchema schema = 
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergTableSchema));
     for (String field : projected) {
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
index 3652e0bb56..ff3348bbc3 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
@@ -29,23 +29,19 @@ import org.apache.flink.types.Row;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkConfigOptions;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
 
 public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded {
   private volatile TableEnvironment tEnv;
 
-  public TestIcebergSourceBoundedSql(String fileFormat) {
-    super(fileFormat);
-  }
-
-  @Before
+  @BeforeEach
   public void before() throws IOException {
     Configuration tableConf = getTableEnv().getConfig().getConfiguration();
     
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
 true);
     SqlHelpers.sql(
         getTableEnv(),
         "create catalog iceberg_catalog with ('type'='iceberg', 
'catalog-type'='hadoop', 'warehouse'='%s')",
-        catalogResource.warehouse());
+        catalogExtension.warehouse());
     SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog");
     getTableEnv()
         .getConfig()

Reply via email to