This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new a975a95552 Flink: Migrate HadoopCatalog related tests (#10358)
a975a95552 is described below
commit a975a955523239ca6423adf2ec26915ca8675f4f
Author: Tom Tanaka <[email protected]>
AuthorDate: Mon Jul 1 22:24:02 2024 +0900
Flink: Migrate HadoopCatalog related tests (#10358)
---
.../apache/iceberg/flink/HadoopTableExtension.java | 59 ++++++++
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 148 ++++++++++-----------
.../flink/sink/TestFlinkIcebergSinkBranch.java | 66 ++++-----
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 111 ++++++----------
.../flink/sink/TestFlinkIcebergSinkV2Base.java | 42 ++++--
.../flink/sink/TestFlinkIcebergSinkV2Branch.java | 69 +++++-----
.../iceberg/flink/source/TestFlinkSourceSql.java | 4 +-
.../iceberg/flink/source/TestIcebergSourceSql.java | 4 +-
.../apache/iceberg/flink/source/TestSqlBase.java | 46 ++++---
.../iceberg/flink/source/reader/ReaderUtil.java | 22 +++
.../reader/TestColumnStatsWatermarkExtractor.java | 90 ++++++-------
11 files changed, 356 insertions(+), 305 deletions(-)
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/HadoopTableExtension.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/HadoopTableExtension.java
new file mode 100644
index 0000000000..dc6ef400a4
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/HadoopTableExtension.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class HadoopTableExtension extends HadoopCatalogExtension {
+ private final Schema schema;
+ private final PartitionSpec partitionSpec;
+
+ private Table table;
+
+ public HadoopTableExtension(String database, String tableName, Schema
schema) {
+ this(database, tableName, schema, null);
+ }
+
+ public HadoopTableExtension(
+ String database, String tableName, Schema schema, PartitionSpec
partitionSpec) {
+ super(database, tableName);
+ this.schema = schema;
+ this.partitionSpec = partitionSpec;
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ super.beforeEach(context);
+ if (partitionSpec == null) {
+ this.table = catalog.createTable(TableIdentifier.of(database,
tableName), schema);
+ } else {
+ this.table =
+ catalog.createTable(TableIdentifier.of(database, tableName), schema,
partitionSpec);
+ }
+ tableLoader.open();
+ }
+
+ public Table table() {
+ return table;
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 9ff79419b0..527525e9f1 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.IOException;
@@ -29,17 +31,21 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkWriteOptions;
-import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
@@ -47,59 +53,52 @@ import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+ @RegisterExtension
+ public static MiniClusterExtension miniClusterResource =
+ MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
- @Rule
- public final HadoopCatalogResource catalogResource =
- new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE);
+ @RegisterExtension
+ private static final HadoopCatalogExtension catalogResource =
+ new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
private TableLoader tableLoader;
- private final FileFormat format;
- private final int parallelism;
- private final boolean partitioned;
+ @Parameter(index = 0)
+ private FileFormat format;
+
+ @Parameter(index = 1)
+ private int parallelism;
- @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned
= {2}")
+ @Parameter(index = 2)
+ private boolean partitioned;
+
+ @Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
public static Object[][] parameters() {
return new Object[][] {
- {"avro", 1, true},
- {"avro", 1, false},
- {"avro", 2, true},
- {"avro", 2, false},
- {"orc", 1, true},
- {"orc", 1, false},
- {"orc", 2, true},
- {"orc", 2, false},
- {"parquet", 1, true},
- {"parquet", 1, false},
- {"parquet", 2, true},
- {"parquet", 2, false}
+ {FileFormat.AVRO, 1, true},
+ {FileFormat.AVRO, 1, false},
+ {FileFormat.AVRO, 2, true},
+ {FileFormat.AVRO, 2, false},
+ {FileFormat.ORC, 1, true},
+ {FileFormat.ORC, 1, false},
+ {FileFormat.ORC, 2, true},
+ {FileFormat.ORC, 2, false},
+ {FileFormat.PARQUET, 1, true},
+ {FileFormat.PARQUET, 1, false},
+ {FileFormat.PARQUET, 2, true},
+ {FileFormat.PARQUET, 2, false}
};
}
- public TestFlinkIcebergSink(String format, int parallelism, boolean
partitioned) {
- this.format = FileFormat.fromString(format);
- this.parallelism = parallelism;
- this.partitioned = partitioned;
- }
-
- @Before
+ @BeforeEach
public void before() throws IOException {
table =
catalogResource
@@ -122,7 +121,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
tableLoader = catalogResource.tableLoader();
}
- @Test
+ @TestTemplate
public void testWriteRowData() throws Exception {
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =
@@ -165,17 +164,17 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data",
partition)).size();
}
- @Test
+ @TestTemplate
public void testWriteRow() throws Exception {
testWriteRow(null, DistributionMode.NONE);
}
- @Test
+ @TestTemplate
public void testWriteRowWithTableSchema() throws Exception {
testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
}
- @Test
+ @TestTemplate
public void testJobNoneDistributeMode() throws Exception {
table
.updateProperties()
@@ -187,12 +186,12 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
if (parallelism > 1) {
if (partitioned) {
int files = partitionFiles("aaa") + partitionFiles("bbb") +
partitionFiles("ccc");
- Assert.assertTrue("Should have more than 3 files in iceberg table.",
files > 3);
+ assertThat(files).isGreaterThan(3);
}
}
}
- @Test
+ @TestTemplate
public void testJobHashDistributionMode() {
table
.updateProperties()
@@ -204,7 +203,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
.hasMessage("Flink does not support 'range' write distribution mode
now.");
}
- @Test
+ @TestTemplate
public void testJobNullDistributionMode() throws Exception {
table
.updateProperties()
@@ -214,42 +213,33 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
testWriteRow(null, null);
if (partitioned) {
- Assert.assertEquals(
- "There should be only 1 data file in partition 'aaa'", 1,
partitionFiles("aaa"));
- Assert.assertEquals(
- "There should be only 1 data file in partition 'bbb'", 1,
partitionFiles("bbb"));
- Assert.assertEquals(
- "There should be only 1 data file in partition 'ccc'", 1,
partitionFiles("ccc"));
+ assertThat(partitionFiles("aaa")).isEqualTo(1);
+ assertThat(partitionFiles("bbb")).isEqualTo(1);
+ assertThat(partitionFiles("ccc")).isEqualTo(1);
}
}
- @Test
+ @TestTemplate
public void testPartitionWriteMode() throws Exception {
testWriteRow(null, DistributionMode.HASH);
if (partitioned) {
- Assert.assertEquals(
- "There should be only 1 data file in partition 'aaa'", 1,
partitionFiles("aaa"));
- Assert.assertEquals(
- "There should be only 1 data file in partition 'bbb'", 1,
partitionFiles("bbb"));
- Assert.assertEquals(
- "There should be only 1 data file in partition 'ccc'", 1,
partitionFiles("ccc"));
+ assertThat(partitionFiles("aaa")).isEqualTo(1);
+ assertThat(partitionFiles("bbb")).isEqualTo(1);
+ assertThat(partitionFiles("ccc")).isEqualTo(1);
}
}
- @Test
+ @TestTemplate
public void testShuffleByPartitionWithSchema() throws Exception {
testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
if (partitioned) {
- Assert.assertEquals(
- "There should be only 1 data file in partition 'aaa'", 1,
partitionFiles("aaa"));
- Assert.assertEquals(
- "There should be only 1 data file in partition 'bbb'", 1,
partitionFiles("bbb"));
- Assert.assertEquals(
- "There should be only 1 data file in partition 'ccc'", 1,
partitionFiles("ccc"));
+ assertThat(partitionFiles("aaa")).isEqualTo(1);
+ assertThat(partitionFiles("bbb")).isEqualTo(1);
+ assertThat(partitionFiles("ccc")).isEqualTo(1);
}
}
- @Test
+ @TestTemplate
public void testTwoSinksInDisjointedDAG() throws Exception {
Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
@@ -323,16 +313,14 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
SimpleDataUtil.assertTableRows(rightTable, convertToRowData(rightRows));
leftTable.refresh();
- Assert.assertNull(leftTable.currentSnapshot().summary().get("flink.test"));
- Assert.assertNull(leftTable.currentSnapshot().summary().get("direction"));
+
assertThat(leftTable.currentSnapshot().summary()).doesNotContainKeys("flink.test",
"direction");
rightTable.refresh();
- Assert.assertEquals(
- TestFlinkIcebergSink.class.getName(),
- rightTable.currentSnapshot().summary().get("flink.test"));
- Assert.assertEquals("rightTable",
rightTable.currentSnapshot().summary().get("direction"));
+ assertThat(rightTable.currentSnapshot().summary())
+ .containsEntry("flink.test", TestFlinkIcebergSink.class.getName())
+ .containsEntry("direction", "rightTable");
}
- @Test
+ @TestTemplate
public void testOverrideWriteConfigWithUnknownDistributionMode() {
Map<String, String> newProps = Maps.newHashMap();
newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
@@ -352,7 +340,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
.hasMessage("Invalid distribution mode: UNRECOGNIZED");
}
- @Test
+ @TestTemplate
public void testOverrideWriteConfigWithUnknownFileFormat() {
Map<String, String> newProps = Maps.newHashMap();
newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
@@ -372,7 +360,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
.hasMessage("Invalid file format: UNRECOGNIZED");
}
- @Test
+ @TestTemplate
public void testWriteRowWithTableRefreshInterval() throws Exception {
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
index 16b4542b00..547b4937c5 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
@@ -18,60 +18,60 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.IOException;
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+ @RegisterExtension
+ public static final HadoopCatalogExtension catalogResource =
+ new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+ @Parameter(index = 0)
+ private String formatVersion;
- @Rule
- public final HadoopCatalogResource catalogResource =
- new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE);
+ @Parameter(index = 1)
+ private String branch;
- private final String branch;
private TableLoader tableLoader;
- @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
- public static Object[] parameters() {
- return new Object[] {"main", "testBranch"};
- }
-
- public TestFlinkIcebergSinkBranch(String branch) {
- this.branch = branch;
+ @Parameters(name = "formatVersion = {0}, branch = {1}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {"1", "main"},
+ {"1", "testBranch"},
+ {"2", "main"},
+ {"2", "testBranch"}
+ };
}
- @Before
+ @BeforeEach
public void before() throws IOException {
table =
catalogResource
@@ -84,7 +84,7 @@ public class TestFlinkIcebergSinkBranch extends
TestFlinkIcebergSinkBase {
TableProperties.DEFAULT_FILE_FORMAT,
FileFormat.AVRO.name(),
TableProperties.FORMAT_VERSION,
- "1"));
+ formatVersion));
env =
StreamExecutionEnvironment.getExecutionEnvironment(
@@ -94,7 +94,7 @@ public class TestFlinkIcebergSinkBranch extends
TestFlinkIcebergSinkBase {
tableLoader = catalogResource.tableLoader();
}
- @Test
+ @TestTemplate
public void testWriteRowWithTableSchema() throws Exception {
testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
verifyOtherBranchUnmodified();
@@ -129,9 +129,9 @@ public class TestFlinkIcebergSinkBranch extends
TestFlinkIcebergSinkBase {
String otherBranch =
branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" :
SnapshotRef.MAIN_BRANCH;
if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
- Assert.assertNull(table.currentSnapshot());
+ assertThat(table.currentSnapshot()).isNull();
}
- Assert.assertTrue(table.snapshot(otherBranch) == null);
+ assertThat(table.snapshot(otherBranch)).isNull();
}
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 7712481d33..9cbb9f091e 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -18,85 +18,53 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+@ExtendWith(ParameterizedTestExtension.class)
+@Timeout(value = 60)
public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
+ @RegisterExtension
+ public static MiniClusterExtension miniClusterResource =
+ MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
-
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
-
- @Rule
- public final HadoopCatalogResource catalogResource =
- new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE);
-
- @Rule public final Timeout globalTimeout = Timeout.seconds(60);
-
- @Parameterized.Parameters(
- name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2},
WriteDistributionMode ={3}")
- public static Object[][] parameters() {
- return new Object[][] {
- new Object[] {"avro", 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"avro", 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"avro", 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"avro", 4, false,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"orc", 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"orc", 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"orc", 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"orc", 4, false,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"parquet", 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
- new Object[] {"parquet", 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
- new Object[] {"parquet", 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
- new Object[] {"parquet", 4, false,
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}
- };
- }
-
- public TestFlinkIcebergSinkV2(
- String format, int parallelism, boolean partitioned, String
writeDistributionMode) {
- this.format = FileFormat.fromString(format);
- this.parallelism = parallelism;
- this.partitioned = partitioned;
- this.writeDistributionMode = writeDistributionMode;
- }
+ @RegisterExtension
+ private static final HadoopCatalogExtension catalogResource =
+ new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
- @Before
+ @BeforeEach
public void setupTable() {
table =
catalogResource
@@ -129,7 +97,7 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
tableLoader = catalogResource.tableLoader();
}
- @Test
+ @TestTemplate
public void testCheckAndGetEqualityFieldIds() {
table
.updateSchema()
@@ -144,28 +112,25 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
// Use schema identifier field IDs as equality field id list by default
- Assert.assertEquals(
- table.schema().identifierFieldIds(),
- Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+ assertThat(builder.checkAndGetEqualityFieldIds())
+
.containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds());
// Use user-provided equality field column as equality field id list
builder.equalityFieldColumns(Lists.newArrayList("id"));
- Assert.assertEquals(
- Sets.newHashSet(table.schema().findField("id").fieldId()),
- Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+ assertThat(builder.checkAndGetEqualityFieldIds())
+ .containsExactlyInAnyOrder(table.schema().findField("id").fieldId());
builder.equalityFieldColumns(Lists.newArrayList("type"));
- Assert.assertEquals(
- Sets.newHashSet(table.schema().findField("type").fieldId()),
- Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+ assertThat(builder.checkAndGetEqualityFieldIds())
+ .containsExactlyInAnyOrder(table.schema().findField("type").fieldId());
}
- @Test
+ @TestTemplate
public void testChangeLogOnIdKey() throws Exception {
testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH);
}
- @Test
+ @TestTemplate
public void testUpsertOnlyDeletesOnDataKey() throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
@@ -184,22 +149,22 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
SnapshotRef.MAIN_BRANCH);
}
- @Test
+ @TestTemplate
public void testChangeLogOnDataKey() throws Exception {
testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH);
}
- @Test
+ @TestTemplate
public void testChangeLogOnIdDataKey() throws Exception {
testChangeLogOnIdDataKey(SnapshotRef.MAIN_BRANCH);
}
- @Test
+ @TestTemplate
public void testChangeLogOnSameKey() throws Exception {
testChangeLogOnSameKey(SnapshotRef.MAIN_BRANCH);
}
- @Test
+ @TestTemplate
public void testUpsertModeCheck() throws Exception {
DataStream<Row> dataStream =
env.addSource(new BoundedTestSource<>(ImmutableList.of()),
ROW_TYPE_INFO);
@@ -227,22 +192,22 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
"Equality field columns shouldn't be empty when configuring to use
UPSERT data stream.");
}
- @Test
+ @TestTemplate
public void testUpsertOnIdKey() throws Exception {
testUpsertOnIdKey(SnapshotRef.MAIN_BRANCH);
}
- @Test
+ @TestTemplate
public void testUpsertOnDataKey() throws Exception {
testUpsertOnDataKey(SnapshotRef.MAIN_BRANCH);
}
- @Test
+ @TestTemplate
public void testUpsertOnIdDataKey() throws Exception {
testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
}
- @Test
+ @TestTemplate
public void testDeleteStats() throws Exception {
assumeThat(format).isNotEqualTo(FileFormat.AVRO);
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
index 9cdf7743c4..fc33c2fea5 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.IOException;
@@ -31,6 +32,8 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -44,7 +47,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.StructLikeSet;
-import org.junit.Assert;
public class TestFlinkIcebergSinkV2Base {
@@ -55,14 +57,40 @@ public class TestFlinkIcebergSinkV2Base {
protected static final int ROW_ID_POS = 0;
protected static final int ROW_DATA_POS = 1;
- protected int parallelism = 1;
protected TableLoader tableLoader;
protected Table table;
protected StreamExecutionEnvironment env;
+
+ @Parameter(index = 0)
protected FileFormat format;
+
+ @Parameter(index = 1)
+ protected int parallelism = 1;
+
+ @Parameter(index = 2)
protected boolean partitioned;
+
+ @Parameter(index = 3)
protected String writeDistributionMode;
+ @Parameters(name = "FileFormat={0}, Parallelism={1}, Partitioned={2},
WriteDistributionMode={3}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ new Object[] {FileFormat.AVRO, 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {FileFormat.AVRO, 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {FileFormat.AVRO, 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {FileFormat.AVRO, 4, false,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {FileFormat.ORC, 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {FileFormat.ORC, 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {FileFormat.ORC, 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {FileFormat.ORC, 4, false,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {FileFormat.PARQUET, 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+ new Object[] {FileFormat.PARQUET, 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+ new Object[] {FileFormat.PARQUET, 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+ new Object[] {FileFormat.PARQUET, 4, false,
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}
+ };
+ }
+
protected static final Map<String, RowKind> ROW_KIND_MAP =
ImmutableMap.of(
"+I", RowKind.INSERT,
@@ -319,16 +347,14 @@ public class TestFlinkIcebergSinkV2Base {
table.refresh();
List<Snapshot> snapshots = findValidSnapshots();
int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
- Assert.assertEquals(
- "Should have the expected snapshot number", expectedSnapshotNum,
snapshots.size());
+ assertThat(snapshots).hasSize(expectedSnapshotNum);
for (int i = 0; i < expectedSnapshotNum; i++) {
long snapshotId = snapshots.get(i).snapshotId();
List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
- Assert.assertEquals(
- "Should have the expected records for the checkpoint#" + i,
- expectedRowSet(expectedRecords.toArray(new Record[0])),
- actualRowSet(snapshotId, "*"));
+ assertThat(actualRowSet(snapshotId, "*"))
+ .as("Should have the expected records for the checkpoint#" + i)
+ .isEqualTo(expectedRowSet(expectedRecords.toArray(new Record[0])));
}
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
index fed3338482..1c5c97b58d 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
@@ -18,52 +18,43 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.IOException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base {
-
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
-
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
- @Rule
- public final HadoopCatalogResource catalogResource =
- new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE);
-
- private final String branch;
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base {
+ @RegisterExtension
+ private static final HadoopCatalogExtension catalogResource =
+ new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
- @Parameterized.Parameters(name = "branch = {0}")
- public static Object[] parameters() {
- return new Object[] {"main", "testBranch"};
- }
+ @Parameter(index = 0)
+ private String branch;
- public TestFlinkIcebergSinkV2Branch(String branch) {
- this.branch = branch;
+ @Parameters(name = "branch = {0}")
+ public static Object[][] parameters() {
+ return new Object[][] {new Object[] {"main"}, new Object[] {"testBranch"}};
}
- @Before
+ @BeforeEach
public void before() throws IOException {
table =
catalogResource
@@ -86,37 +77,37 @@ public class TestFlinkIcebergSinkV2Branch extends
TestFlinkIcebergSinkV2Base {
tableLoader = catalogResource.tableLoader();
}
- @Test
+ @TestTemplate
public void testChangeLogOnIdKey() throws Exception {
testChangeLogOnIdKey(branch);
verifyOtherBranchUnmodified();
}
- @Test
+ @TestTemplate
public void testChangeLogOnDataKey() throws Exception {
testChangeLogOnDataKey(branch);
verifyOtherBranchUnmodified();
}
- @Test
+ @TestTemplate
public void testChangeLogOnIdDataKey() throws Exception {
testChangeLogOnIdDataKey(branch);
verifyOtherBranchUnmodified();
}
- @Test
+ @TestTemplate
public void testUpsertOnIdKey() throws Exception {
testUpsertOnIdKey(branch);
verifyOtherBranchUnmodified();
}
- @Test
+ @TestTemplate
public void testUpsertOnDataKey() throws Exception {
testUpsertOnDataKey(branch);
verifyOtherBranchUnmodified();
}
- @Test
+ @TestTemplate
public void testUpsertOnIdDataKey() throws Exception {
testUpsertOnIdDataKey(branch);
verifyOtherBranchUnmodified();
@@ -126,9 +117,9 @@ public class TestFlinkIcebergSinkV2Branch extends
TestFlinkIcebergSinkV2Base {
String otherBranch =
branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" :
SnapshotRef.MAIN_BRANCH;
if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
- Assert.assertNull(table.currentSnapshot());
+ assertThat(table.currentSnapshot()).isNull();
}
- Assert.assertTrue(table.snapshot(otherBranch) == null);
+ assertThat(table.snapshot(otherBranch)).isNull();
}
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java
index affd90c347..6857e0a7a3 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceSql.java
@@ -33,7 +33,7 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
/** Use the FlinkSource */
public class TestFlinkSourceSql extends TestSqlBase {
@@ -61,7 +61,7 @@ public class TestFlinkSourceSql extends TestSqlBase {
.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA,
null);
GenericAppenderHelper helper =
- new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+ new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder);
List<Record> expectedRecords = Lists.newArrayList();
long maxFileLen = 0;
for (int i = 0; i < 5; i++) {
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index 4250460d27..645af7cfa3 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -40,7 +40,7 @@ import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
/** Use the IcebergSource (FLIP-27) */
public class TestIcebergSourceSql extends TestSqlBase {
@@ -78,7 +78,7 @@ public class TestIcebergSourceSql extends TestSqlBase {
long baseTime = 1702382109000L;
GenericAppenderHelper helper =
- new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+ new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder);
Record file1Record1 =
generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 *
60 * 24 * 30L));
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
index dda4603314..94962e02bb 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
@@ -18,13 +18,17 @@
*/
package org.apache.iceberg.flink.source;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.IOException;
+import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
@@ -34,30 +38,28 @@ import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
-import org.apache.iceberg.flink.HadoopCatalogResource;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
/** Test other more advanced usage of SQL. They don't need to run for every
file format. */
public abstract class TestSqlBase {
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
+ @RegisterExtension
+ public static MiniClusterExtension miniClusterResource =
+ MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+ @RegisterExtension
+ public static final HadoopCatalogExtension catalogResource =
+ new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
- @Rule
- public final HadoopCatalogResource catalogResource =
- new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE);
+ @TempDir protected Path temporaryFolder;
private volatile TableEnvironment tEnv;
@@ -73,7 +75,7 @@ public abstract class TestSqlBase {
return tEnv;
}
- @Before
+ @BeforeEach
public abstract void before() throws IOException;
@Test
@@ -90,7 +92,7 @@ public abstract class TestSqlBase {
writeRecords.get(1).set(2, "2020-03-20");
GenericAppenderHelper helper =
- new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+ new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder);
List<Record> expectedRecords = Lists.newArrayList();
expectedRecords.add(writeRecords.get(0));
@@ -120,7 +122,7 @@ public abstract class TestSqlBase {
expectedRecords.forEach(expectedRecord -> expectedRecord.set(2,
"2020-03-20"));
GenericAppenderHelper helper =
- new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+ new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder);
DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0),
expectedRecords);
helper.appendToTable(dataFile);
@@ -140,9 +142,9 @@ public abstract class TestSqlBase {
// When running with CI or local, `localityEnabled` will be false even if
this configuration is
// enabled
- Assert.assertFalse(
- "Expose split locality info should be false.",
- SourceUtil.isLocalityEnabled(table, tableConf, true));
+ assertThat(SourceUtil.isLocalityEnabled(table, tableConf, true))
+ .as("Expose split locality info should be false.")
+ .isFalse();
results = run(Maps.newHashMap(), "where dt='2020-03-20'", "*");
org.apache.iceberg.flink.TestHelpers.assertRecords(
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
index f06d9b83bc..e3e341ca2c 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.source.reader;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.data.RowData;
@@ -106,6 +107,7 @@ public class ReaderUtil {
return Lists.partition(records, batchCount);
}
+ // Only for JUnit4 tests. Keep this method for test migration from JUnit4 to
JUnit5
public static CombinedScanTask createCombinedScanTask(
List<List<Record>> recordBatchList,
TemporaryFolder temporaryFolder,
@@ -122,4 +124,24 @@ public class ReaderUtil {
return new BaseCombinedScanTask(fileTasks);
}
+
+ public static CombinedScanTask createCombinedScanTask(
+ List<List<Record>> recordBatchList,
+ Path temporaryFolder,
+ FileFormat fileFormat,
+ GenericAppenderFactory appenderFactory)
+ throws IOException {
+ List<FileScanTask> fileTasks =
Lists.newArrayListWithCapacity(recordBatchList.size());
+ for (List<Record> recordBatch : recordBatchList) {
+ FileScanTask fileTask =
+ ReaderUtil.createFileTask(
+ recordBatch,
+ File.createTempFile("junit", null, temporaryFolder.toFile()),
+ fileFormat,
+ appenderFactory);
+ fileTasks.add(fileTask);
+ }
+
+ return new BaseCombinedScanTask(fileTasks);
+ }
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
index 604bc09619..7033fd30e8 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
@@ -18,10 +18,14 @@
*/
package org.apache.iceberg.flink.source.reader;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
+import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
@@ -30,27 +34,26 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.HadoopTableExtension;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
public class TestColumnStatsWatermarkExtractor {
public static final Schema SCHEMA =
new Schema(
@@ -68,15 +71,16 @@ public class TestColumnStatsWatermarkExtractor {
private static final List<Map<String, Long>> MIN_VALUES =
ImmutableList.of(Maps.newHashMapWithExpectedSize(3),
Maps.newHashMapWithExpectedSize(3));
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+ @TempDir protected Path temporaryFolder;
- @Rule
- public final HadoopTableResource sourceTableResource =
- new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE, SCHEMA);
+ @RegisterExtension
+ private static final HadoopTableExtension sourceTableResource =
+ new HadoopTableExtension(DATABASE, TestFixtures.TABLE, SCHEMA);
- private final String columnName;
+ @Parameter(index = 0)
+ private String columnName;
- @BeforeClass
+ @BeforeAll
public static void updateMinValue() {
for (int i = 0; i < TEST_RECORDS.size(); ++i) {
for (Record r : TEST_RECORDS.get(i)) {
@@ -94,7 +98,7 @@ public class TestColumnStatsWatermarkExtractor {
}
}
- @Parameterized.Parameters(name = "{0}")
+ @Parameters(name = "columnName = {0}")
public static Collection<Object[]> data() {
return ImmutableList.of(
new Object[] {"timestamp_column"},
@@ -102,62 +106,56 @@ public class TestColumnStatsWatermarkExtractor {
new Object[] {"long_column"});
}
- public TestColumnStatsWatermarkExtractor(String columnName) {
- this.columnName = columnName;
- }
-
- @Test
+ @TestTemplate
public void testSingle() throws IOException {
ColumnStatsWatermarkExtractor extractor =
new ColumnStatsWatermarkExtractor(SCHEMA, columnName,
TimeUnit.MILLISECONDS);
- Assert.assertEquals(
- MIN_VALUES.get(0).get(columnName).longValue(),
extractor.extractWatermark(split(0)));
+ assertThat(extractor.extractWatermark(split(0)))
+ .isEqualTo(MIN_VALUES.get(0).get(columnName).longValue());
}
- @Test
+ @TestTemplate
public void testTimeUnit() throws IOException {
- Assume.assumeTrue("Run only for long column",
columnName.equals("long_column"));
+ assumeThat(columnName).isEqualTo("long_column");
ColumnStatsWatermarkExtractor extractor =
new ColumnStatsWatermarkExtractor(SCHEMA, columnName,
TimeUnit.MICROSECONDS);
- Assert.assertEquals(
- MIN_VALUES.get(0).get(columnName).longValue() / 1000L,
- extractor.extractWatermark(split(0)));
+ assertThat(extractor.extractWatermark(split(0)))
+ .isEqualTo(MIN_VALUES.get(0).get(columnName).longValue() / 1000L);
}
- @Test
+ @TestTemplate
public void testMultipleFiles() throws IOException {
- Assume.assumeTrue("Run only for the timestamp column",
columnName.equals("timestamp_column"));
+ assumeThat(columnName).isEqualTo("timestamp_column");
IcebergSourceSplit combinedSplit =
IcebergSourceSplit.fromCombinedScanTask(
ReaderUtil.createCombinedScanTask(
- TEST_RECORDS, TEMPORARY_FOLDER, FileFormat.PARQUET,
APPENDER_FACTORY));
+ TEST_RECORDS, temporaryFolder, FileFormat.PARQUET,
APPENDER_FACTORY));
ColumnStatsWatermarkExtractor extractor =
new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null);
- Assert.assertEquals(
- MIN_VALUES.get(0).get(columnName).longValue(),
extractor.extractWatermark(split(0)));
- Assert.assertEquals(
- MIN_VALUES.get(1).get(columnName).longValue(),
extractor.extractWatermark(split(1)));
- Assert.assertEquals(
- Math.min(MIN_VALUES.get(0).get(columnName),
MIN_VALUES.get(1).get(columnName)),
- extractor.extractWatermark(combinedSplit));
+ assertThat(extractor.extractWatermark(split(0)))
+ .isEqualTo(MIN_VALUES.get(0).get(columnName).longValue());
+ assertThat(extractor.extractWatermark(split(1)))
+ .isEqualTo(MIN_VALUES.get(1).get(columnName).longValue());
+ assertThat(extractor.extractWatermark(combinedSplit))
+ .isEqualTo(Math.min(MIN_VALUES.get(0).get(columnName),
MIN_VALUES.get(1).get(columnName)));
}
- @Test
+ @TestTemplate
public void testWrongColumn() {
- Assume.assumeTrue("Run only for string column",
columnName.equals("string_column"));
+ assumeThat(columnName).isEqualTo("string_column");
assertThatThrownBy(() -> new ColumnStatsWatermarkExtractor(SCHEMA,
columnName, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Found STRING, expected a LONG or TIMESTAMP column for watermark
generation.");
}
- @Test
+ @TestTemplate
public void testEmptyStatistics() throws IOException {
- Assume.assumeTrue("Run only for timestamp column",
columnName.equals("timestamp_column"));
+ assumeThat(columnName).isEqualTo("timestamp_column");
// Create an extractor for a column we do not have statistics
ColumnStatsWatermarkExtractor extractor =
@@ -171,7 +169,7 @@ public class TestColumnStatsWatermarkExtractor {
return IcebergSourceSplit.fromCombinedScanTask(
ReaderUtil.createCombinedScanTask(
ImmutableList.of(TEST_RECORDS.get(id)),
- TEMPORARY_FOLDER,
+ temporaryFolder,
FileFormat.PARQUET,
APPENDER_FACTORY));
}