This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 581e03713c Flink 1.18: Create JUnit5 version of TestFlinkScan (#9480)
581e03713c is described below
commit 581e03713c2b397c2416899fdf1e433540aca3d3
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Jan 16 11:47:12 2024 +0100
Flink 1.18: Create JUnit5 version of TestFlinkScan (#9480)
---
.../java/org/apache/iceberg/flink/TestHelpers.java | 385 ++++++++++-----------
.../iceberg/flink/source/TestFlinkInputFormat.java | 30 +-
.../apache/iceberg/flink/source/TestFlinkScan.java | 152 ++++----
.../iceberg/flink/source/TestFlinkScanSql.java | 11 +-
.../iceberg/flink/source/TestFlinkSource.java | 6 +-
.../flink/source/TestIcebergSourceBounded.java | 9 +-
.../flink/source/TestIcebergSourceBoundedSql.java | 10 +-
7 files changed, 279 insertions(+), 324 deletions(-)
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 38b0eb0b40..80e5ddd24f 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
@@ -67,7 +69,6 @@ import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.assertj.core.api.Assertions;
-import org.junit.Assert;
public class TestHelpers {
private TestHelpers() {}
@@ -154,7 +155,7 @@ public class TestHelpers {
}
public static void assertRows(List<Row> results, List<Row> expected) {
-
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
+ assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
}
public static void assertRowsWithOrder(List<Row> results, List<Row>
expected) {
@@ -174,9 +175,8 @@ public class TestHelpers {
return;
}
- Assert.assertTrue(
- "expected Record and actual RowData should be both null or not null",
- expectedRecord != null && actualRowData != null);
+ assertThat(expectedRecord).isNotNull();
+ assertThat(actualRowData).isNotNull();
List<Type> types = Lists.newArrayList();
for (Types.NestedField field : structType.fields()) {
@@ -208,109 +208,106 @@ public class TestHelpers {
return;
}
- Assert.assertTrue(
- "expected and actual should be both null or not null", expected !=
null && actual != null);
+ assertThat(expected).isNotNull();
+ assertThat(actual).isNotNull();
switch (type.typeId()) {
case BOOLEAN:
- Assert.assertEquals("boolean value should be equal", expected, actual);
+ assertThat(actual).as("boolean value should be
equal").isEqualTo(expected);
break;
case INTEGER:
- Assert.assertEquals("int value should be equal", expected, actual);
+ assertThat(actual).as("int value should be equal").isEqualTo(expected);
break;
case LONG:
- Assert.assertEquals("long value should be equal", expected, actual);
+ assertThat(actual).as("long value should be
equal").isEqualTo(expected);
break;
case FLOAT:
- Assert.assertEquals("float value should be equal", expected, actual);
+ assertThat(actual).as("float value should be
equal").isEqualTo(expected);
break;
case DOUBLE:
- Assert.assertEquals("double value should be equal", expected, actual);
+ assertThat(actual).as("double value should be
equal").isEqualTo(expected);
break;
case STRING:
- Assertions.assertThat(expected)
- .as("Should expect a CharSequence")
- .isInstanceOf(CharSequence.class);
- Assert.assertEquals("string should be equal",
String.valueOf(expected), actual.toString());
+ assertThat(expected).as("Should expect a
CharSequence").isInstanceOf(CharSequence.class);
+ assertThat(actual.toString())
+ .as("string should be equal")
+ .isEqualTo(String.valueOf(expected));
break;
case DATE:
- Assertions.assertThat(expected).as("Should expect a
Date").isInstanceOf(LocalDate.class);
+ assertThat(expected).as("Should expect a
Date").isInstanceOf(LocalDate.class);
LocalDate date = DateTimeUtil.dateFromDays((int) actual);
- Assert.assertEquals("date should be equal", expected, date);
+ assertThat(date).as("date should be equal").isEqualTo(expected);
break;
case TIME:
- Assertions.assertThat(expected)
- .as("Should expect a LocalTime")
- .isInstanceOf(LocalTime.class);
+ assertThat(expected).as("Should expect a
LocalTime").isInstanceOf(LocalTime.class);
int milliseconds = (int) (((LocalTime) expected).toNanoOfDay() /
1000_000);
- Assert.assertEquals("time millis should be equal", milliseconds,
actual);
+ assertThat(actual).as("time millis should be
equal").isEqualTo(milliseconds);
break;
case TIMESTAMP:
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
- Assertions.assertThat(expected)
+ assertThat(expected)
.as("Should expect a OffsetDataTime")
.isInstanceOf(OffsetDateTime.class);
OffsetDateTime ts = (OffsetDateTime) expected;
- Assert.assertEquals(
- "OffsetDataTime should be equal",
- ts.toLocalDateTime(),
- ((TimestampData) actual).toLocalDateTime());
+ assertThat(((TimestampData) actual).toLocalDateTime())
+ .as("OffsetDataTime should be equal")
+ .isEqualTo(ts.toLocalDateTime());
} else {
- Assertions.assertThat(expected)
+ assertThat(expected)
.as("Should expect a LocalDataTime")
.isInstanceOf(LocalDateTime.class);
LocalDateTime ts = (LocalDateTime) expected;
- Assert.assertEquals(
- "LocalDataTime should be equal", ts, ((TimestampData)
actual).toLocalDateTime());
+ assertThat(((TimestampData) actual).toLocalDateTime())
+ .as("LocalDataTime should be equal")
+ .isEqualTo(ts);
}
break;
case BINARY:
- Assertions.assertThat(expected)
+ assertThat(ByteBuffer.wrap((byte[]) actual))
.as("Should expect a ByteBuffer")
- .isInstanceOf(ByteBuffer.class);
- Assert.assertEquals("binary should be equal", expected,
ByteBuffer.wrap((byte[]) actual));
+ .isInstanceOf(ByteBuffer.class)
+ .isEqualTo(expected);
break;
case DECIMAL:
- Assertions.assertThat(expected)
- .as("Should expect a BigDecimal")
- .isInstanceOf(BigDecimal.class);
+ assertThat(expected).as("Should expect a
BigDecimal").isInstanceOf(BigDecimal.class);
BigDecimal bd = (BigDecimal) expected;
- Assert.assertEquals(
- "decimal value should be equal", bd, ((DecimalData)
actual).toBigDecimal());
+ assertThat(((DecimalData) actual).toBigDecimal())
+ .as("decimal value should be equal")
+ .isEqualTo(bd);
break;
case LIST:
- Assertions.assertThat(expected)
- .as("Should expect a Collection")
- .isInstanceOf(Collection.class);
+ assertThat(expected).as("Should expect a
Collection").isInstanceOf(Collection.class);
Collection<?> expectedArrayData = (Collection<?>) expected;
ArrayData actualArrayData = (ArrayData) actual;
LogicalType elementType = ((ArrayType) logicalType).getElementType();
- Assert.assertEquals(
- "array length should be equal", expectedArrayData.size(),
actualArrayData.size());
+ assertThat(actualArrayData.size())
+ .as("array length should be equal")
+ .isEqualTo(expectedArrayData.size());
assertArrayValues(
type.asListType().elementType(), elementType, expectedArrayData,
actualArrayData);
break;
case MAP:
- Assertions.assertThat(expected).as("Should expect a
Map").isInstanceOf(Map.class);
+ assertThat(expected).as("Should expect a Map").isInstanceOf(Map.class);
assertMapValues(type.asMapType(), logicalType, (Map<?, ?>) expected,
(MapData) actual);
break;
case STRUCT:
- Assertions.assertThat(expected).as("Should expect a
Record").isInstanceOf(StructLike.class);
+ assertThat(expected).as("Should expect a
Record").isInstanceOf(StructLike.class);
assertRowData(type.asStructType(), logicalType, (StructLike) expected,
(RowData) actual);
break;
case UUID:
- Assertions.assertThat(expected).as("Should expect a
UUID").isInstanceOf(UUID.class);
+ assertThat(expected).as("Should expect a
UUID").isInstanceOf(UUID.class);
ByteBuffer bb = ByteBuffer.wrap((byte[]) actual);
long firstLong = bb.getLong();
long secondLong = bb.getLong();
- Assert.assertEquals(
- "UUID should be equal",
- expected.toString(),
- new UUID(firstLong, secondLong).toString());
+ assertThat(new UUID(firstLong, secondLong).toString())
+ .as("UUID should be equal")
+ .isEqualTo(expected.toString());
break;
case FIXED:
- Assertions.assertThat(expected).as("Should expect
byte[]").isInstanceOf(byte[].class);
- Assert.assertArrayEquals("binary should be equal", (byte[]) expected,
(byte[]) actual);
+ assertThat(actual)
+ .as("Should expect byte[]")
+ .isInstanceOf(byte[].class)
+ .isEqualTo(expected);
break;
default:
throw new IllegalArgumentException("Not a supported type: " + type);
@@ -324,8 +321,9 @@ public class TestHelpers {
public static void assertEquals(Schema schema, GenericData.Record record,
Row row) {
List<Types.NestedField> fields = schema.asStruct().fields();
- Assert.assertEquals(fields.size(), record.getSchema().getFields().size());
- Assert.assertEquals(fields.size(), row.getArity());
+ assertThat(fields).hasSameSizeAs(record.getSchema().getFields());
+ assertThat(fields).hasSize(row.getArity());
+
RowType rowType = FlinkSchemaUtil.convert(schema);
for (int i = 0; i < fields.size(); ++i) {
Type fieldType = fields.get(i).type();
@@ -352,9 +350,8 @@ public class TestHelpers {
if (expected == null && actual == null) {
return;
}
-
- Assert.assertTrue(
- "expected and actual should be both null or not null", expected !=
null && actual != null);
+ assertThat(expected).isNotNull();
+ assertThat(actual).isNotNull();
switch (type.typeId()) {
case BOOLEAN:
@@ -362,87 +359,79 @@ public class TestHelpers {
case LONG:
case FLOAT:
case DOUBLE:
- Assertions.assertThat(expected)
+ assertThat(expected)
.as("Should expect a " + type.typeId().javaClass())
.isInstanceOf(type.typeId().javaClass());
- Assertions.assertThat(actual)
+ assertThat(actual)
.as("Should expect a " + type.typeId().javaClass())
.isInstanceOf(type.typeId().javaClass());
- Assert.assertEquals(type.typeId() + " value should be equal",
expected, actual);
+ assertThat(actual).as(type.typeId() + " value should be
equal").isEqualTo(expected);
break;
case STRING:
- Assertions.assertThat(expected)
- .as("Should expect a CharSequence")
- .isInstanceOf(CharSequence.class);
- Assertions.assertThat(actual)
- .as("Should expect a CharSequence")
- .isInstanceOf(CharSequence.class);
- Assert.assertEquals("string should be equal", expected.toString(),
actual.toString());
+ assertThat(expected).as("Should expect a
CharSequence").isInstanceOf(CharSequence.class);
+ assertThat(actual).as("Should expect a
CharSequence").isInstanceOf(CharSequence.class);
+ assertThat(actual.toString()).as("string should be
equal").isEqualTo(expected.toString());
break;
case DATE:
- Assertions.assertThat(expected).as("Should expect a
Date").isInstanceOf(LocalDate.class);
+ assertThat(expected).as("Should expect a
Date").isInstanceOf(LocalDate.class);
LocalDate date = DateTimeUtil.dateFromDays((int) actual);
- Assert.assertEquals("date should be equal", expected, date);
+ assertThat(date).as("date should be equal").isEqualTo(expected);
break;
case TIME:
- Assertions.assertThat(expected)
- .as("Should expect a LocalTime")
- .isInstanceOf(LocalTime.class);
+ assertThat(expected).as("Should expect a
LocalTime").isInstanceOf(LocalTime.class);
int milliseconds = (int) (((LocalTime) expected).toNanoOfDay() /
1000_000);
- Assert.assertEquals("time millis should be equal", milliseconds,
actual);
+ assertThat(actual).as("time millis should be
equal").isEqualTo(milliseconds);
break;
case TIMESTAMP:
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
- Assertions.assertThat(expected)
+ assertThat(expected)
.as("Should expect a OffsetDataTime")
.isInstanceOf(OffsetDateTime.class);
OffsetDateTime ts = (OffsetDateTime) expected;
- Assert.assertEquals(
- "OffsetDataTime should be equal",
- ts.toLocalDateTime(),
- ((TimestampData) actual).toLocalDateTime());
+ assertThat(((TimestampData) actual).toLocalDateTime())
+ .as("OffsetDataTime should be equal")
+ .isEqualTo(ts.toLocalDateTime());
} else {
- Assertions.assertThat(expected)
+ assertThat(expected)
.as("Should expect a LocalDataTime")
.isInstanceOf(LocalDateTime.class);
LocalDateTime ts = (LocalDateTime) expected;
- Assert.assertEquals(
- "LocalDataTime should be equal", ts, ((TimestampData)
actual).toLocalDateTime());
+ assertThat(((TimestampData) actual).toLocalDateTime())
+ .as("LocalDataTime should be equal")
+ .isEqualTo(ts);
}
break;
case BINARY:
- Assertions.assertThat(expected)
+ assertThat(ByteBuffer.wrap((byte[]) actual))
.as("Should expect a ByteBuffer")
- .isInstanceOf(ByteBuffer.class);
- Assert.assertEquals("binary should be equal", expected,
ByteBuffer.wrap((byte[]) actual));
+ .isInstanceOf(ByteBuffer.class)
+ .isEqualTo(expected);
break;
case DECIMAL:
- Assertions.assertThat(expected)
- .as("Should expect a BigDecimal")
- .isInstanceOf(BigDecimal.class);
+ assertThat(expected).as("Should expect a
BigDecimal").isInstanceOf(BigDecimal.class);
BigDecimal bd = (BigDecimal) expected;
- Assert.assertEquals(
- "decimal value should be equal", bd, ((DecimalData)
actual).toBigDecimal());
+ assertThat(((DecimalData) actual).toBigDecimal())
+ .as("decimal value should be equal")
+ .isEqualTo(bd);
break;
case LIST:
- Assertions.assertThat(expected)
- .as("Should expect a Collection")
- .isInstanceOf(Collection.class);
+ assertThat(expected).as("Should expect a
Collection").isInstanceOf(Collection.class);
Collection<?> expectedArrayData = (Collection<?>) expected;
ArrayData actualArrayData;
try {
- actualArrayData = convertToArray(actual);
+ actualArrayData = (ArrayData) actual;
} catch (ClassCastException e) {
actualArrayData = new GenericArrayData((Object[]) actual);
}
LogicalType elementType = ((ArrayType) logicalType).getElementType();
- Assert.assertEquals(
- "array length should be equal", expectedArrayData.size(),
actualArrayData.size());
+ assertThat(actualArrayData.size())
+ .as("array length should be equal")
+ .isEqualTo(expectedArrayData.size());
assertArrayValues(
type.asListType().elementType(), elementType, expectedArrayData,
actualArrayData);
break;
case MAP:
- Assertions.assertThat(expected).as("Should expect a
Map").isInstanceOf(Map.class);
+ assertThat(expected).as("Should expect a Map").isInstanceOf(Map.class);
MapData actualMap;
try {
actualMap = (MapData) actual;
@@ -452,60 +441,36 @@ public class TestHelpers {
assertMapValues(type.asMapType(), logicalType, (Map<?, ?>) expected,
actualMap);
break;
case STRUCT:
- Assertions.assertThat(expected)
- .as("Should expect a Record")
- .isInstanceOf(GenericData.Record.class);
+ assertThat(expected).as("Should expect a
Record").isInstanceOf(GenericData.Record.class);
assertEquals(
type.asNestedType().asStructType(), (GenericData.Record) expected,
(Row) actual);
break;
case UUID:
- Assertions.assertThat(expected).as("Should expect a
UUID").isInstanceOf(UUID.class);
+ assertThat(expected).as("Should expect a
UUID").isInstanceOf(UUID.class);
ByteBuffer bb = ByteBuffer.wrap((byte[]) actual);
long firstLong = bb.getLong();
long secondLong = bb.getLong();
- Assert.assertEquals(
- "UUID should be equal",
- expected.toString(),
- new UUID(firstLong, secondLong).toString());
+ assertThat(new UUID(firstLong, secondLong).toString())
+ .as("UUID should be equal")
+ .isEqualTo(expected.toString());
break;
case FIXED:
- Assertions.assertThat(expected).as("Should expect
byte[]").isInstanceOf(byte[].class);
- Assert.assertArrayEquals("binary should be equal", (byte[]) expected,
(byte[]) actual);
+ assertThat(actual)
+ .as("Should expect byte[]")
+ .isInstanceOf(byte[].class)
+ .isEqualTo(expected);
break;
default:
throw new IllegalArgumentException("Not a supported type: " + type);
}
}
- private static GenericArrayData convertToArray(Object actual) {
- Class<?> arrayClass = actual.getClass();
- if (Object[].class.equals(arrayClass)) {
- return new GenericArrayData((Object[]) actual);
- } else if (int[].class.equals(arrayClass)) {
- return new GenericArrayData((int[]) actual);
- } else if (long[].class.equals(arrayClass)) {
- return new GenericArrayData((long[]) actual);
- } else if (float[].class.equals(arrayClass)) {
- return new GenericArrayData((float[]) actual);
- } else if (double[].class.equals(arrayClass)) {
- return new GenericArrayData((double[]) actual);
- } else if (short[].class.equals(arrayClass)) {
- return new GenericArrayData((short[]) actual);
- } else if (byte[].class.equals(arrayClass)) {
- return new GenericArrayData((byte[]) actual);
- } else if (boolean[].class.equals(arrayClass)) {
- return new GenericArrayData((boolean[]) actual);
- } else {
- throw new IllegalArgumentException("Unsupported type " + arrayClass);
- }
- }
-
private static void assertArrayValues(
Type type, LogicalType logicalType, Collection<?> expectedArray,
ArrayData actualArray) {
List<?> expectedElements = Lists.newArrayList(expectedArray);
for (int i = 0; i < expectedArray.size(); i += 1) {
if (expectedElements.get(i) == null) {
- Assert.assertTrue(actualArray.isNullAt(i));
+ assertThat(actualArray.isNullAt(i)).isTrue();
continue;
}
@@ -521,7 +486,7 @@ public class TestHelpers {
private static void assertMapValues(
Types.MapType mapType, LogicalType type, Map<?, ?> expected, MapData
actual) {
- Assert.assertEquals("map size should be equal", expected.size(),
actual.size());
+ assertThat(actual.size()).as("map size should be
equal").isEqualTo(expected.size());
ArrayData actualKeyArrayData = actual.keyArray();
ArrayData actualValueArrayData = actual.valueArray();
@@ -547,7 +512,7 @@ public class TestHelpers {
// not found
}
}
- Assert.assertNotNull("Should have a matching key", matchedActualKey);
+ assertThat(matchedActualKey).as("Should have a matching
key").isNotNull();
final int valueIndex = matchedKeyIndex;
assertEquals(
valueType,
@@ -561,60 +526,67 @@ public class TestHelpers {
if (expected == actual) {
return;
}
- Assert.assertTrue("Should not be null.", expected != null && actual !=
null);
- Assert.assertEquals("Path must match", expected.path(), actual.path());
- Assert.assertEquals("Length must match", expected.length(),
actual.length());
- Assert.assertEquals("Spec id must match", expected.partitionSpecId(),
actual.partitionSpecId());
- Assert.assertEquals("ManifestContent must match", expected.content(),
actual.content());
- Assert.assertEquals(
- "SequenceNumber must match", expected.sequenceNumber(),
actual.sequenceNumber());
- Assert.assertEquals(
- "MinSequenceNumber must match", expected.minSequenceNumber(),
actual.minSequenceNumber());
- Assert.assertEquals("Snapshot id must match", expected.snapshotId(),
actual.snapshotId());
- Assert.assertEquals(
- "Added files flag must match", expected.hasAddedFiles(),
actual.hasAddedFiles());
- Assert.assertEquals(
- "Added files count must match", expected.addedFilesCount(),
actual.addedFilesCount());
- Assert.assertEquals(
- "Added rows count must match", expected.addedRowsCount(),
actual.addedRowsCount());
- Assert.assertEquals(
- "Existing files flag must match", expected.hasExistingFiles(),
actual.hasExistingFiles());
- Assert.assertEquals(
- "Existing files count must match",
- expected.existingFilesCount(),
- actual.existingFilesCount());
- Assert.assertEquals(
- "Existing rows count must match", expected.existingRowsCount(),
actual.existingRowsCount());
- Assert.assertEquals(
- "Deleted files flag must match", expected.hasDeletedFiles(),
actual.hasDeletedFiles());
- Assert.assertEquals(
- "Deleted files count must match", expected.deletedFilesCount(),
actual.deletedFilesCount());
- Assert.assertEquals(
- "Deleted rows count must match", expected.deletedRowsCount(),
actual.deletedRowsCount());
+ assertThat(expected).isNotNull();
+ assertThat(actual).isNotNull();
+ assertThat(actual.path()).as("Path must match").isEqualTo(expected.path());
+ assertThat(actual.length()).as("Length must
match").isEqualTo(expected.length());
+ assertThat(actual.partitionSpecId())
+ .as("Spec id must match")
+ .isEqualTo(expected.partitionSpecId());
+ assertThat(actual.content()).as("ManifestContent must
match").isEqualTo(expected.content());
+ assertThat(actual.sequenceNumber())
+ .as("SequenceNumber must match")
+ .isEqualTo(expected.sequenceNumber());
+ assertThat(actual.minSequenceNumber())
+ .as("MinSequenceNumber must match")
+ .isEqualTo(expected.minSequenceNumber());
+ assertThat(actual.snapshotId()).as("Snapshot id must
match").isEqualTo(expected.snapshotId());
+ assertThat(actual.hasAddedFiles())
+ .as("Added files flag must match")
+ .isEqualTo(expected.hasAddedFiles());
+ assertThat(actual.addedFilesCount())
+ .as("Added files count must match")
+ .isEqualTo(expected.addedFilesCount());
+ assertThat(actual.addedRowsCount())
+ .as("Added rows count must match")
+ .isEqualTo(expected.addedRowsCount());
+ assertThat(actual.hasExistingFiles())
+ .as("Existing files flag must match")
+ .isEqualTo(expected.hasExistingFiles());
+ assertThat(actual.existingFilesCount())
+ .as("Existing files count must match")
+ .isEqualTo(expected.existingFilesCount());
+ assertThat(actual.existingRowsCount())
+ .as("Existing rows count must match")
+ .isEqualTo(expected.existingRowsCount());
+ assertThat(actual.hasDeletedFiles())
+ .as("Deleted files flag must match")
+ .isEqualTo(expected.hasDeletedFiles());
+ assertThat(actual.deletedFilesCount())
+ .as("Deleted files count must match")
+ .isEqualTo(expected.deletedFilesCount());
+ assertThat(actual.deletedRowsCount())
+ .as("Deleted rows count must match")
+ .isEqualTo(expected.deletedRowsCount());
List<ManifestFile.PartitionFieldSummary> expectedSummaries =
expected.partitions();
List<ManifestFile.PartitionFieldSummary> actualSummaries =
actual.partitions();
- Assert.assertEquals(
- "PartitionFieldSummary size does not match",
- expectedSummaries.size(),
- actualSummaries.size());
+ assertThat(actualSummaries)
+ .as("PartitionFieldSummary size does not match")
+ .hasSameSizeAs(expectedSummaries);
for (int i = 0; i < expectedSummaries.size(); i++) {
- Assert.assertEquals(
- "Null flag in partition must match",
- expectedSummaries.get(i).containsNull(),
- actualSummaries.get(i).containsNull());
- Assert.assertEquals(
- "NaN flag in partition must match",
- expectedSummaries.get(i).containsNaN(),
- actualSummaries.get(i).containsNaN());
- Assert.assertEquals(
- "Lower bounds in partition must match",
- expectedSummaries.get(i).lowerBound(),
- actualSummaries.get(i).lowerBound());
- Assert.assertEquals(
- "Upper bounds in partition must match",
- expectedSummaries.get(i).upperBound(),
- actualSummaries.get(i).upperBound());
+ assertThat(actualSummaries.get(i).containsNull())
+ .as("Null flag in partition must match")
+ .isEqualTo(expectedSummaries.get(i).containsNull());
+ assertThat(actualSummaries.get(i).containsNaN())
+ .as("NaN flag in partition must match")
+ .isEqualTo(expectedSummaries.get(i).containsNaN());
+ assertThat(actualSummaries.get(i).lowerBound())
+ .as("Lower bounds in partition must match")
+ .isEqualTo(expectedSummaries.get(i).lowerBound());
+ assertThat(actualSummaries.get(i).upperBound())
+ .as("Upper bounds in partition must match")
+ .isEqualTo(expectedSummaries.get(i).upperBound());
}
}
@@ -622,28 +594,35 @@ public class TestHelpers {
if (expected == actual) {
return;
}
- Assert.assertTrue("Shouldn't be null.", expected != null && actual !=
null);
- Assert.assertEquals("SpecId", expected.specId(), actual.specId());
- Assert.assertEquals("Content", expected.content(), actual.content());
- Assert.assertEquals("Path", expected.path(), actual.path());
- Assert.assertEquals("Format", expected.format(), actual.format());
- Assert.assertEquals("Partition size", expected.partition().size(),
actual.partition().size());
+ assertThat(expected).isNotNull();
+ assertThat(actual).isNotNull();
+ assertThat(actual.specId()).as("SpecId").isEqualTo(expected.specId());
+ assertThat(actual.content()).as("Content").isEqualTo(expected.content());
+ assertThat(actual.path()).as("Path").isEqualTo(expected.path());
+ assertThat(actual.format()).as("Format").isEqualTo(expected.format());
+ assertThat(actual.partition().size())
+ .as("Partition size")
+ .isEqualTo(expected.partition().size());
for (int i = 0; i < expected.partition().size(); i++) {
- Assert.assertEquals(
- "Partition data at index " + i,
- expected.partition().get(i, Object.class),
- actual.partition().get(i, Object.class));
+ assertThat(actual.partition().get(i, Object.class))
+ .as("Partition data at index " + i)
+ .isEqualTo(expected.partition().get(i, Object.class));
}
- Assert.assertEquals("Record count", expected.recordCount(),
actual.recordCount());
- Assert.assertEquals("File size in bytes", expected.fileSizeInBytes(),
actual.fileSizeInBytes());
- Assert.assertEquals("Column sizes", expected.columnSizes(),
actual.columnSizes());
- Assert.assertEquals("Value counts", expected.valueCounts(),
actual.valueCounts());
- Assert.assertEquals("Null value counts", expected.nullValueCounts(),
actual.nullValueCounts());
- Assert.assertEquals("Lower bounds", expected.lowerBounds(),
actual.lowerBounds());
- Assert.assertEquals("Upper bounds", expected.upperBounds(),
actual.upperBounds());
- Assert.assertEquals("Key metadata", expected.keyMetadata(),
actual.keyMetadata());
- Assert.assertEquals("Split offsets", expected.splitOffsets(),
actual.splitOffsets());
- Assert.assertEquals(
- "Equality field id list", actual.equalityFieldIds(),
expected.equalityFieldIds());
+ assertThat(actual.recordCount()).as("Record
count").isEqualTo(expected.recordCount());
+ assertThat(actual.fileSizeInBytes())
+ .as("File size in bytes")
+ .isEqualTo(expected.fileSizeInBytes());
+ assertThat(actual.columnSizes()).as("Column
sizes").isEqualTo(expected.columnSizes());
+ assertThat(actual.valueCounts()).as("Value
counts").isEqualTo(expected.valueCounts());
+ assertThat(actual.nullValueCounts())
+ .as("Null value counts")
+ .isEqualTo(expected.nullValueCounts());
+ assertThat(actual.lowerBounds()).as("Lower
bounds").isEqualTo(expected.lowerBounds());
+ assertThat(actual.upperBounds()).as("Upper
bounds").isEqualTo(expected.upperBounds());
+ assertThat(actual.keyMetadata()).as("Key
metadata").isEqualTo(expected.keyMetadata());
+ assertThat(actual.splitOffsets()).as("Split
offsets").isEqualTo(expected.splitOffsets());
+ assertThat(actual.equalityFieldIds())
+ .as("Equality field id list")
+ .isEqualTo(expected.equalityFieldIds());
}
}
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
index 73d03710d3..ed3f54bec6 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.source;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.util.Collections;
@@ -41,16 +42,11 @@ import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
-import org.junit.Assume;
-import org.junit.Test;
+import org.junit.jupiter.api.TestTemplate;
/** Test {@link FlinkInputFormat}. */
public class TestFlinkInputFormat extends TestFlinkSource {
- public TestFlinkInputFormat(String fileFormat) {
- super(fileFormat);
- }
-
@Override
protected List<Row> run(
FlinkSource.Builder formatBuilder,
@@ -61,7 +57,7 @@ public class TestFlinkInputFormat extends TestFlinkSource {
return runFormat(formatBuilder.tableLoader(tableLoader()).buildFormat());
}
- @Test
+ @TestTemplate
public void testNestedProjection() throws Exception {
Schema schema =
new Schema(
@@ -75,10 +71,11 @@ public class TestFlinkInputFormat extends TestFlinkSource {
Types.NestedField.required(5, "f3",
Types.LongType.get()))),
required(6, "id", Types.LongType.get()));
- Table table =
catalogResource.catalog().createTable(TableIdentifier.of("default", "t"),
schema);
+ Table table =
+ catalogExtension.catalog().createTable(TableIdentifier.of("default",
"t"), schema);
List<Record> writeRecords = RandomGenericData.generate(schema, 2, 0L);
- new GenericAppenderHelper(table, fileFormat,
TEMPORARY_FOLDER).appendToTable(writeRecords);
+ new GenericAppenderHelper(table, fileFormat,
temporaryDirectory).appendToTable(writeRecords);
// Schema: [data, nested[f1, f2, f3], id]
// Projection: [nested.f2, data]
@@ -106,7 +103,7 @@ public class TestFlinkInputFormat extends TestFlinkSource {
TestHelpers.assertRows(result, expected);
}
- @Test
+ @TestTemplate
public void testBasicProjection() throws IOException {
Schema writeSchema =
new Schema(
@@ -115,10 +112,10 @@ public class TestFlinkInputFormat extends TestFlinkSource
{
Types.NestedField.optional(2, "time",
Types.TimestampType.withZone()));
Table table =
- catalogResource.catalog().createTable(TableIdentifier.of("default",
"t"), writeSchema);
+ catalogExtension.catalog().createTable(TableIdentifier.of("default",
"t"), writeSchema);
List<Record> writeRecords = RandomGenericData.generate(writeSchema, 2, 0L);
- new GenericAppenderHelper(table, fileFormat,
TEMPORARY_FOLDER).appendToTable(writeRecords);
+ new GenericAppenderHelper(table, fileFormat,
temporaryDirectory).appendToTable(writeRecords);
TableSchema projectedSchema =
TableSchema.builder()
@@ -140,9 +137,9 @@ public class TestFlinkInputFormat extends TestFlinkSource {
TestHelpers.assertRows(result, expected);
}
- @Test
+ @TestTemplate
public void testReadPartitionColumn() throws Exception {
- Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);
+ assumeThat(fileFormat).as("Temporary skip
ORC").isNotEqualTo(FileFormat.ORC);
Schema nestedSchema =
new Schema(
@@ -157,9 +154,10 @@ public class TestFlinkInputFormat extends TestFlinkSource {
PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
Table table =
- catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
nestedSchema, spec);
+ catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
nestedSchema, spec);
List<Record> records = RandomGenericData.generate(nestedSchema, 10, 0L);
- GenericAppenderHelper appender = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ GenericAppenderHelper appender =
+ new GenericAppenderHelper(table, fileFormat, temporaryDirectory);
for (Record record : records) {
org.apache.iceberg.TestHelpers.Row partition =
org.apache.iceberg.TestHelpers.Row.of(record.get(1,
Record.class).get(1));
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index b537efa727..428da49f1d 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -18,18 +18,26 @@
*/
package org.apache.iceberg.flink.source;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -38,8 +46,8 @@ import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.flink.HadoopCatalogResource;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
@@ -48,42 +56,32 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
-import org.assertj.core.api.Assertions;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public abstract class TestFlinkScan {
-
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+@ExtendWith(ParameterizedTestExtension.class)
+public abstract class TestFlinkScan {
+ @RegisterExtension
+ protected static MiniClusterExtension miniClusterResource =
+ MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
- @Rule
- public final HadoopCatalogResource catalogResource =
- new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE);
+ @TempDir protected Path temporaryDirectory;
- // parametrized variables
- protected final FileFormat fileFormat;
+ @RegisterExtension
+ protected static final HadoopCatalogExtension catalogExtension =
+ new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
- @Parameterized.Parameters(name = "format={0}")
- public static Object[] parameters() {
- return new Object[] {"avro", "parquet", "orc"};
- }
+ @Parameter protected FileFormat fileFormat;
- TestFlinkScan(String fileFormat) {
- this.fileFormat = FileFormat.fromString(fileFormat);
+ @Parameters(name = "format={0}")
+ public static Collection<FileFormat> fileFormat() {
+ return Arrays.asList(FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC);
}
protected TableLoader tableLoader() {
- return catalogResource.tableLoader();
+ return catalogExtension.tableLoader();
}
protected abstract List<Row> runWithProjection(String... projected) throws
Exception;
@@ -99,41 +97,41 @@ public abstract class TestFlinkScan {
protected abstract List<Row> run() throws Exception;
- @Test
+ @TestTemplate
public void testUnpartitionedTable() throws Exception {
Table table =
- catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
List<Record> expectedRecords =
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
- new GenericAppenderHelper(table, fileFormat,
TEMPORARY_FOLDER).appendToTable(expectedRecords);
+ new GenericAppenderHelper(table, fileFormat,
temporaryDirectory).appendToTable(expectedRecords);
TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
}
- @Test
+ @TestTemplate
public void testPartitionedTable() throws Exception {
Table table =
- catalogResource
+ catalogExtension
.catalog()
.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA,
TestFixtures.SPEC);
List<Record> expectedRecords =
RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
expectedRecords.get(0).set(2, "2020-03-20");
- new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER)
+ new GenericAppenderHelper(table, fileFormat, temporaryDirectory)
.appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0),
expectedRecords);
TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
}
- @Test
+ @TestTemplate
public void testProjection() throws Exception {
Table table =
- catalogResource
+ catalogExtension
.catalog()
.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA,
TestFixtures.SPEC);
List<Record> inputRecords =
RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
- new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER)
+ new GenericAppenderHelper(table, fileFormat, temporaryDirectory)
.appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0),
inputRecords);
assertRows(runWithProjection("data"), Row.of(inputRecords.get(0).get(0)));
}
- @Test
+ @TestTemplate
public void testIdentityPartitionProjections() throws Exception {
Schema logSchema =
new Schema(
@@ -145,7 +143,7 @@ public abstract class TestFlinkScan {
PartitionSpec.builderFor(logSchema).identity("dt").identity("level").build();
Table table =
- catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
logSchema, spec);
+ catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
logSchema, spec);
List<Record> inputRecords = RandomGenericData.generate(logSchema, 10, 0L);
int idx = 0;
@@ -154,7 +152,7 @@ public abstract class TestFlinkScan {
record.set(1, "2020-03-2" + idx);
record.set(2, Integer.toString(idx));
append.appendFile(
- new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER)
+ new GenericAppenderHelper(table, fileFormat, temporaryDirectory)
.writeFile(
org.apache.iceberg.TestHelpers.Row.of("2020-03-2" + idx,
Integer.toString(idx)),
ImmutableList.of(record)));
@@ -200,20 +198,19 @@ public abstract class TestFlinkScan {
for (int i = 0; i < projectedFields.size(); i++) {
String name = projectedFields.get(i);
- Assert.assertEquals(
- "Projected field " + name + " should match",
- inputRecord.getField(name),
- actualRecord.getField(i));
+ assertThat(inputRecord.getField(name))
+ .as("Projected field " + name + " should match")
+ .isEqualTo(actualRecord.getField(i));
}
}
}
- @Test
+ @TestTemplate
public void testSnapshotReads() throws Exception {
Table table =
- catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
- GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, temporaryDirectory);
List<Record> expectedRecords =
RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
helper.appendToTable(expectedRecords);
@@ -235,12 +232,12 @@ public abstract class TestFlinkScan {
TestFixtures.SCHEMA);
}
- @Test
+ @TestTemplate
public void testTagReads() throws Exception {
Table table =
- catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
- GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, temporaryDirectory);
List<Record> expectedRecords1 =
RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
helper.appendToTable(expectedRecords1);
@@ -264,12 +261,12 @@ public abstract class TestFlinkScan {
runWithOptions(ImmutableMap.of("tag", "t1")), expectedRecords,
TestFixtures.SCHEMA);
}
- @Test
+ @TestTemplate
public void testBranchReads() throws Exception {
Table table =
- catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
- GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, temporaryDirectory);
List<Record> expectedRecordsBase =
RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
helper.appendToTable(expectedRecordsBase);
@@ -300,12 +297,12 @@ public abstract class TestFlinkScan {
TestHelpers.assertRecords(run(), mainExpectedRecords, TestFixtures.SCHEMA);
}
- @Test
+ @TestTemplate
public void testIncrementalReadViaTag() throws Exception {
Table table =
- catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
- GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, temporaryDirectory);
List<Record> records1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1,
0L);
helper.appendToTable(records1);
@@ -355,7 +352,7 @@ public abstract class TestFlinkScan {
expected,
TestFixtures.SCHEMA);
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() ->
runWithOptions(
ImmutableMap.<String, String>builder()
@@ -366,7 +363,7 @@ public abstract class TestFlinkScan {
.isInstanceOf(Exception.class)
.hasMessage("START_SNAPSHOT_ID and START_TAG cannot both be set.");
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() ->
runWithOptions(
ImmutableMap.<String, String>builder()
@@ -378,12 +375,12 @@ public abstract class TestFlinkScan {
.hasMessage("END_SNAPSHOT_ID and END_TAG cannot both be set.");
}
- @Test
+ @TestTemplate
public void testIncrementalRead() throws Exception {
Table table =
- catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
- GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, temporaryDirectory);
List<Record> records1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1,
0L);
helper.appendToTable(records1);
@@ -413,10 +410,10 @@ public abstract class TestFlinkScan {
TestFixtures.SCHEMA);
}
- @Test
+ @TestTemplate
public void testFilterExpPartition() throws Exception {
Table table =
- catalogResource
+ catalogExtension
.catalog()
.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA,
TestFixtures.SPEC);
@@ -424,7 +421,7 @@ public abstract class TestFlinkScan {
expectedRecords.get(0).set(2, "2020-03-20");
expectedRecords.get(1).set(2, "2020-03-20");
- GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, temporaryDirectory);
DataFile dataFile1 =
helper.writeFile(org.apache.iceberg.TestHelpers.Row.of("2020-03-20",
0), expectedRecords);
DataFile dataFile2 =
@@ -441,14 +438,14 @@ public abstract class TestFlinkScan {
private void testFilterExp(Expression filter, String sqlFilter, boolean
caseSensitive)
throws Exception {
Table table =
- catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
List<Record> expectedRecords =
RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
expectedRecords.get(0).set(0, "a");
expectedRecords.get(1).set(0, "b");
expectedRecords.get(2).set(0, "c");
- GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, temporaryDirectory);
DataFile dataFile = helper.writeFile(expectedRecords);
helper.appendToTable(dataFile);
@@ -458,19 +455,19 @@ public abstract class TestFlinkScan {
TestHelpers.assertRecords(actual, expectedRecords.subList(1, 3),
TestFixtures.SCHEMA);
}
- @Test
+ @TestTemplate
public void testFilterExp() throws Exception {
testFilterExp(Expressions.greaterThanOrEqual("data", "b"), "where
data>='b'", true);
}
- @Test
+ @TestTemplate
public void testFilterExpCaseInsensitive() throws Exception {
// sqlFilter does not support case-insensitive filtering:
// https://issues.apache.org/jira/browse/FLINK-16175
testFilterExp(Expressions.greaterThanOrEqual("DATA", "b"), "where
data>='b'", false);
}
- @Test
+ @TestTemplate
public void testPartitionTypes() throws Exception {
Schema typesSchema =
new Schema(
@@ -492,9 +489,10 @@ public abstract class TestFlinkScan {
.build();
Table table =
- catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
typesSchema, spec);
+ catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
typesSchema, spec);
List<Record> records = RandomGenericData.generate(typesSchema, 10, 0L);
- GenericAppenderHelper appender = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ GenericAppenderHelper appender =
+ new GenericAppenderHelper(table, fileFormat, temporaryDirectory);
for (Record record : records) {
org.apache.iceberg.TestHelpers.Row partition =
org.apache.iceberg.TestHelpers.Row.of(
@@ -512,7 +510,7 @@ public abstract class TestFlinkScan {
TestHelpers.assertRecords(run(), records, typesSchema);
}
- @Test
+ @TestTemplate
public void testCustomizedFlinkDataTypes() throws Exception {
Schema schema =
new Schema(
@@ -522,9 +520,9 @@ public abstract class TestFlinkScan {
Types.MapType.ofRequired(2, 3, Types.StringType.get(),
Types.StringType.get())),
Types.NestedField.required(
4, "arr", Types.ListType.ofRequired(5,
Types.StringType.get())));
- Table table =
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, schema);
+ Table table =
catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, schema);
List<Record> records = RandomGenericData.generate(schema, 10, 0L);
- GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, TEMPORARY_FOLDER);
+ GenericAppenderHelper helper = new GenericAppenderHelper(table,
fileFormat, temporaryDirectory);
helper.appendToTable(records);
TestHelpers.assertRecords(run(), records, schema);
}
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
index 023166801b..b5bddc7767 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
@@ -25,23 +25,18 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
/** Test Flink SELECT SQLs. */
public class TestFlinkScanSql extends TestFlinkSource {
-
private volatile TableEnvironment tEnv;
- public TestFlinkScanSql(String fileFormat) {
- super(fileFormat);
- }
-
- @Before
+ @BeforeEach
public void before() throws IOException {
SqlHelpers.sql(
getTableEnv(),
"create catalog iceberg_catalog with ('type'='iceberg',
'catalog-type'='hadoop', 'warehouse'='%s')",
- catalogResource.warehouse());
+ catalogExtension.warehouse());
SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog");
getTableEnv()
.getConfig()
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
index 2b55bee6e5..86c7e8991d 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
@@ -32,17 +32,13 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Maps;
public abstract class TestFlinkSource extends TestFlinkScan {
- TestFlinkSource(String fileFormat) {
- super(fileFormat);
- }
-
@Override
protected List<Row> runWithProjection(String... projected) throws Exception {
TableSchema.Builder builder = TableSchema.builder();
TableSchema schema =
FlinkSchemaUtil.toSchema(
FlinkSchemaUtil.convert(
-
catalogResource.catalog().loadTable(TestFixtures.TABLE_IDENTIFIER).schema()));
+
catalogExtension.catalog().loadTable(TestFixtures.TABLE_IDENTIFIER).schema()));
for (String field : projected) {
TableColumn column = schema.getTableColumn(field).get();
builder.field(column.getName(), column.getType());
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
index a80f87d648..3c0c38e111 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -42,20 +42,13 @@ import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-@RunWith(Parameterized.class)
public class TestIcebergSourceBounded extends TestFlinkScan {
- public TestIcebergSourceBounded(String fileFormat) {
- super(fileFormat);
- }
-
@Override
protected List<Row> runWithProjection(String... projected) throws Exception {
Schema icebergTableSchema =
-
catalogResource.catalog().loadTable(TestFixtures.TABLE_IDENTIFIER).schema();
+
catalogExtension.catalog().loadTable(TestFixtures.TABLE_IDENTIFIER).schema();
TableSchema.Builder builder = TableSchema.builder();
TableSchema schema =
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergTableSchema));
for (String field : projected) {
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
index 3652e0bb56..ff3348bbc3 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
@@ -29,23 +29,19 @@ import org.apache.flink.types.Row;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded {
private volatile TableEnvironment tEnv;
- public TestIcebergSourceBoundedSql(String fileFormat) {
- super(fileFormat);
- }
-
- @Before
+ @BeforeEach
public void before() throws IOException {
Configuration tableConf = getTableEnv().getConfig().getConfiguration();
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(),
true);
SqlHelpers.sql(
getTableEnv(),
"create catalog iceberg_catalog with ('type'='iceberg',
'catalog-type'='hadoop', 'warehouse'='%s')",
- catalogResource.warehouse());
+ catalogExtension.warehouse());
SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog");
getTableEnv()
.getConfig()