This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 850cd5c3bc Flink: Migrate subclasses of FlinkCatalogTestBase to JUnit5
(#9381)
850cd5c3bc is described below
commit 850cd5c3bcaac86b392d8ab9fa586f355f6e2d76
Author: vinitpatni <[email protected]>
AuthorDate: Sat Jan 13 19:34:45 2024 +0530
Flink: Migrate subclasses of FlinkCatalogTestBase to JUnit5 (#9381)
---
.../apache/iceberg/flink/FlinkCatalogTestBase.java | 155 -------
.../java/org/apache/iceberg/flink/TestBase.java | 2 +-
.../iceberg/flink/TestFlinkCatalogTable.java | 492 ++++++++++-----------
.../flink/TestFlinkCatalogTablePartitions.java | 2 +-
.../apache/iceberg/flink/TestFlinkHiveCatalog.java | 8 +-
.../apache/iceberg/flink/TestFlinkTableSink.java | 132 +++---
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 56 +--
.../apache/iceberg/flink/TestIcebergConnector.java | 6 +-
.../flink/actions/TestRewriteDataFilesAction.java | 146 +++---
.../flink/source/TestFlinkMetaDataTable.java | 454 +++++++++----------
.../iceberg/flink/source/TestStreamScanSql.java | 50 +--
11 files changed, 618 insertions(+), 885 deletions(-)
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
deleted file mode 100644
index 74c5d343e9..0000000000
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.flink;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.util.ArrayUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.SupportsNamespaces;
-import org.apache.iceberg.hadoop.HadoopCatalog;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public abstract class FlinkCatalogTestBase extends FlinkTestBase {
-
- protected static final String DATABASE = "db";
- private static TemporaryFolder hiveWarehouse = new TemporaryFolder();
- private static TemporaryFolder hadoopWarehouse = new TemporaryFolder();
-
- @BeforeClass
- public static void createWarehouse() throws IOException {
- hiveWarehouse.create();
- hadoopWarehouse.create();
- }
-
- @AfterClass
- public static void dropWarehouse() {
- hiveWarehouse.delete();
- hadoopWarehouse.delete();
- }
-
- @Before
- public void before() {
- sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config));
- }
-
- @After
- public void clean() {
- dropCatalog(catalogName, true);
- }
-
- @Parameterized.Parameters(name = "catalogName = {0} baseNamespace = {1}")
- public static Iterable<Object[]> parameters() {
- return Lists.newArrayList(
- new Object[] {"testhive", Namespace.empty()},
- new Object[] {"testhadoop", Namespace.empty()},
- new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")});
- }
-
- protected final String catalogName;
- protected final Namespace baseNamespace;
- protected final Catalog validationCatalog;
- protected final SupportsNamespaces validationNamespaceCatalog;
- protected final Map<String, String> config = Maps.newHashMap();
-
- protected final String flinkDatabase;
- protected final Namespace icebergNamespace;
- protected final boolean isHadoopCatalog;
-
- public FlinkCatalogTestBase(String catalogName, Namespace baseNamespace) {
- this.catalogName = catalogName;
- this.baseNamespace = baseNamespace;
- this.isHadoopCatalog = catalogName.startsWith("testhadoop");
- this.validationCatalog =
- isHadoopCatalog
- ? new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getRoot())
- : catalog;
- this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
-
- config.put("type", "iceberg");
- if (!baseNamespace.isEmpty()) {
- config.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString());
- }
- if (isHadoopCatalog) {
- config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
- } else {
- config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
- config.put(CatalogProperties.URI, getURI(hiveConf));
- }
- config.put(CatalogProperties.WAREHOUSE_LOCATION,
String.format("file://%s", warehouseRoot()));
-
- this.flinkDatabase = catalogName + "." + DATABASE;
- this.icebergNamespace =
- Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[]
{DATABASE}));
- }
-
- protected String warehouseRoot() {
- if (isHadoopCatalog) {
- return hadoopWarehouse.getRoot().getAbsolutePath();
- } else {
- return hiveWarehouse.getRoot().getAbsolutePath();
- }
- }
-
- protected String getFullQualifiedTableName(String tableName) {
- final List<String> levels = Lists.newArrayList(icebergNamespace.levels());
- levels.add(tableName);
- return Joiner.on('.').join(levels);
- }
-
- static String getURI(HiveConf conf) {
- return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
- }
-
- static String toWithClause(Map<String, String> props) {
- StringBuilder builder = new StringBuilder();
- builder.append("(");
- int propCount = 0;
- for (Map.Entry<String, String> entry : props.entrySet()) {
- if (propCount > 0) {
- builder.append(",");
- }
- builder
- .append("'")
- .append(entry.getKey())
- .append("'")
- .append("=")
- .append("'")
- .append(entry.getValue())
- .append("'");
- propCount++;
- }
- builder.append(")");
- return builder.toString();
- }
-}
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
index 4fc0207f26..3986f1a796 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -45,7 +45,7 @@ public abstract class TestBase extends TestBaseUtils {
public static MiniClusterExtension miniClusterResource =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
- @TempDir Path temporaryDirectory;
+ @TempDir protected Path temporaryDirectory;
private static TestHiveMetastore metastore = null;
protected static HiveConf hiveConf = null;
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 8f5ddde918..ef0802d869 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.flink;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.util.Arrays;
import java.util.Collections;
@@ -46,30 +48,21 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
-public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
-
- public TestFlinkCatalogTable(String catalogName, Namespace baseNamespace) {
- super(catalogName, baseNamespace);
- }
+public class TestFlinkCatalogTable extends CatalogTestBase {
@Override
- @Before
+ @BeforeEach
public void before() {
super.before();
sql("CREATE DATABASE %s", flinkDatabase);
@@ -77,7 +70,7 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
sql("USE %s", DATABASE);
}
- @After
+ @AfterEach
public void cleanNamespaces() {
sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
sql("DROP TABLE IF EXISTS %s.tl2", flinkDatabase);
@@ -85,7 +78,7 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
super.clean();
}
- @Test
+ @TestTemplate
public void testGetTable() {
sql("CREATE TABLE tl(id BIGINT, strV STRING)");
@@ -94,84 +87,73 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "strV", Types.StringType.get()));
- Assert.assertEquals(
- "Should load the expected iceberg schema", iSchema.toString(),
table.schema().toString());
+ assertThat(table.schema().toString())
+ .as("Should load the expected iceberg schema")
+ .isEqualTo(iSchema.toString());
}
- @Test
+ @TestTemplate
public void testRenameTable() {
- Assume.assumeFalse("HadoopCatalog does not support rename table",
isHadoopCatalog);
-
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support rename
table").isFalse();
final Schema tableSchema =
new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()));
validationCatalog.createTable(TableIdentifier.of(icebergNamespace, "tl"),
tableSchema);
sql("ALTER TABLE tl RENAME TO tl2");
- Assertions.assertThatThrownBy(() -> getTableEnv().from("tl"))
+ assertThatThrownBy(() -> getTableEnv().from("tl"))
.isInstanceOf(ValidationException.class)
.hasMessage("Table `tl` was not found.");
Schema actualSchema =
FlinkSchemaUtil.convert(getTableEnv().from("tl2").getSchema());
- Assert.assertEquals(tableSchema.asStruct(), actualSchema.asStruct());
+ assertThat(tableSchema.asStruct()).isEqualTo(actualSchema.asStruct());
}
- @Test
+ @TestTemplate
public void testCreateTable() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT)");
Table table = table("tl");
- Assert.assertEquals(
- new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct(),
- table.schema().asStruct());
-
+ assertThat(table.schema().asStruct())
+ .isEqualTo(
+ new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct());
CatalogTable catalogTable = catalogTable("tl");
- Assert.assertEquals(
- TableSchema.builder().field("id", DataTypes.BIGINT()).build(),
catalogTable.getSchema());
+ assertThat(catalogTable.getSchema())
+ .isEqualTo(TableSchema.builder().field("id",
DataTypes.BIGINT()).build());
}
- @Test
+ @TestTemplate
public void testCreateTableWithPrimaryKey() throws Exception {
sql("CREATE TABLE tl(id BIGINT, data STRING, key STRING PRIMARY KEY NOT
ENFORCED)");
Table table = table("tl");
- Assert.assertEquals(
- "Should have the expected row key.",
- Sets.newHashSet(table.schema().findField("key").fieldId()),
- table.schema().identifierFieldIds());
-
+ assertThat(table.schema().identifierFieldIds())
+ .as("Should have the expected row key.")
+ .isEqualTo(Sets.newHashSet(table.schema().findField("key").fieldId()));
CatalogTable catalogTable = catalogTable("tl");
Optional<UniqueConstraint> uniqueConstraintOptional =
catalogTable.getSchema().getPrimaryKey();
- Assert.assertTrue(
- "Should have the expected unique constraint",
uniqueConstraintOptional.isPresent());
- Assert.assertEquals(
- "Should have the expected columns",
- ImmutableList.of("key"),
- uniqueConstraintOptional.get().getColumns());
+ assertThat(uniqueConstraintOptional).isPresent();
+
assertThat(uniqueConstraintOptional.get().getColumns()).containsExactly("key");
}
- @Test
+ @TestTemplate
public void testCreateTableWithMultiColumnsInPrimaryKey() throws Exception {
sql(
"CREATE TABLE tl(id BIGINT, data STRING, CONSTRAINT pk_constraint
PRIMARY KEY(data, id) NOT ENFORCED)");
Table table = table("tl");
- Assert.assertEquals(
- "Should have the expected RowKey",
- Sets.newHashSet(
- table.schema().findField("id").fieldId(),
table.schema().findField("data").fieldId()),
- table.schema().identifierFieldIds());
-
+ assertThat(table.schema().identifierFieldIds())
+ .as("Should have the expected RowKey")
+ .isEqualTo(
+ Sets.newHashSet(
+ table.schema().findField("id").fieldId(),
+ table.schema().findField("data").fieldId()));
CatalogTable catalogTable = catalogTable("tl");
Optional<UniqueConstraint> uniqueConstraintOptional =
catalogTable.getSchema().getPrimaryKey();
- Assert.assertTrue(
- "Should have the expected unique constraint",
uniqueConstraintOptional.isPresent());
- Assert.assertEquals(
- "Should have the expected columns",
- ImmutableSet.of("data", "id"),
- ImmutableSet.copyOf(uniqueConstraintOptional.get().getColumns()));
+ assertThat(uniqueConstraintOptional).isPresent();
+
assertThat(uniqueConstraintOptional.get().getColumns()).containsExactly("id",
"data");
}
- @Test
+ @TestTemplate
public void testCreateTableIfNotExists() {
sql("CREATE TABLE tl(id BIGINT)");
@@ -193,97 +175,96 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
assertThat(table("tl").properties()).containsEntry("key", "value");
}
- @Test
+ @TestTemplate
public void testCreateTableLike() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT)");
sql("CREATE TABLE tl2 LIKE tl");
Table table = table("tl2");
- Assert.assertEquals(
- new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct(),
- table.schema().asStruct());
-
+ assertThat(table.schema().asStruct())
+ .isEqualTo(
+ new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct());
CatalogTable catalogTable = catalogTable("tl2");
- Assert.assertEquals(
- TableSchema.builder().field("id", DataTypes.BIGINT()).build(),
catalogTable.getSchema());
+ assertThat(catalogTable.getSchema())
+ .isEqualTo(TableSchema.builder().field("id",
DataTypes.BIGINT()).build());
}
- @Test
+ @TestTemplate
public void testCreateTableLocation() {
- Assume.assumeFalse(
- "HadoopCatalog does not support creating table with location",
isHadoopCatalog);
-
+ assumeThat(isHadoopCatalog)
+ .as("HadoopCatalog does not support creating table with location")
+ .isFalse();
sql("CREATE TABLE tl(id BIGINT) WITH ('location'='file:///tmp/location')");
Table table = table("tl");
- Assert.assertEquals(
- new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct(),
- table.schema().asStruct());
- Assert.assertEquals("file:///tmp/location", table.location());
+ assertThat(table.schema().asStruct())
+ .isEqualTo(
+ new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct());
+ assertThat(table.location()).isEqualTo("file:///tmp/location");
}
- @Test
+ @TestTemplate
public void testCreatePartitionTable() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT, dt STRING) PARTITIONED BY(dt)");
Table table = table("tl");
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()))
- .asStruct(),
- table.schema().asStruct());
- Assert.assertEquals(
- PartitionSpec.builderFor(table.schema()).identity("dt").build(),
table.spec());
-
+ assertThat(table.schema().asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()))
+ .asStruct());
+ assertThat(table.spec())
+
.isEqualTo(PartitionSpec.builderFor(table.schema()).identity("dt").build());
CatalogTable catalogTable = catalogTable("tl");
- Assert.assertEquals(
- TableSchema.builder()
- .field("id", DataTypes.BIGINT())
- .field("dt", DataTypes.STRING())
- .build(),
- catalogTable.getSchema());
- Assert.assertEquals(Collections.singletonList("dt"),
catalogTable.getPartitionKeys());
+ assertThat(catalogTable.getSchema())
+ .isEqualTo(
+ TableSchema.builder()
+ .field("id", DataTypes.BIGINT())
+ .field("dt", DataTypes.STRING())
+ .build());
+
assertThat(catalogTable.getPartitionKeys()).isEqualTo(Collections.singletonList("dt"));
}
- @Test
+ @TestTemplate
public void testCreateTableWithFormatV2ThroughTableProperty() throws
Exception {
sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')");
Table table = table("tl");
- Assert.assertEquals(
- "should create table using format v2",
- 2,
- ((BaseTable) table).operations().current().formatVersion());
+ assertThat(((BaseTable)
table).operations().current().formatVersion()).isEqualTo(2);
}
- @Test
+ @TestTemplate
public void testUpgradeTableWithFormatV2ThroughTableProperty() throws
Exception {
sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='1')");
Table table = table("tl");
TableOperations ops = ((BaseTable) table).operations();
- Assert.assertEquals("should create table using format v1", 1,
ops.refresh().formatVersion());
-
+ assertThat(ops.refresh().formatVersion())
+ .as("should create table using format v1")
+ .isEqualTo(1);
sql("ALTER TABLE tl SET('format-version'='2')");
- Assert.assertEquals("should update table to use format v2", 2,
ops.refresh().formatVersion());
+ assertThat(ops.refresh().formatVersion())
+ .as("should update table to use format v2")
+ .isEqualTo(2);
}
- @Test
+ @TestTemplate
public void testDowngradeTableToFormatV1ThroughTablePropertyFails() throws
Exception {
sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')");
Table table = table("tl");
TableOperations ops = ((BaseTable) table).operations();
- Assert.assertEquals("should create table using format v2", 2,
ops.refresh().formatVersion());
-
+ assertThat(ops.refresh().formatVersion())
+ .as("should create table using format v2")
+ .isEqualTo(2);
Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl
SET('format-version'='1')"))
.rootCause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot downgrade v2 table to v1");
}
- @Test
+ @TestTemplate
public void testLoadTransformPartitionTable() throws TableNotExistException {
Schema schema = new Schema(Types.NestedField.optional(0, "id",
Types.LongType.get()));
validationCatalog.createTable(
@@ -292,12 +273,12 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
PartitionSpec.builderFor(schema).bucket("id", 100).build());
CatalogTable catalogTable = catalogTable("tl");
- Assert.assertEquals(
- TableSchema.builder().field("id", DataTypes.BIGINT()).build(),
catalogTable.getSchema());
- Assert.assertEquals(Collections.emptyList(),
catalogTable.getPartitionKeys());
+ assertThat(catalogTable.getSchema())
+ .isEqualTo(TableSchema.builder().field("id",
DataTypes.BIGINT()).build());
+ assertThat(catalogTable.getPartitionKeys()).isEmpty();
}
- @Test
+ @TestTemplate
public void testAlterTableProperties() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')");
Map<String, String> properties = Maps.newHashMap();
@@ -319,35 +300,32 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
assertThat(table("tl").properties()).containsAllEntriesOf(properties);
}
- @Test
+ @TestTemplate
public void testAlterTableAddColumn() {
sql("CREATE TABLE tl(id BIGINT)");
Schema schemaBefore = table("tl").schema();
- Assert.assertEquals(
- new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct(),
- schemaBefore.asStruct());
-
+ assertThat(schemaBefore.asStruct())
+ .isEqualTo(
+ new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct());
sql("ALTER TABLE tl ADD (dt STRING)");
Schema schemaAfter1 = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()))
- .asStruct(),
- schemaAfter1.asStruct());
-
+ assertThat(schemaAfter1.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()))
+ .asStruct());
// Add multiple columns
sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)");
Schema schemaAfter2 = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()),
- Types.NestedField.optional(3, "col1", Types.StringType.get()),
- Types.NestedField.optional(4, "col2", Types.LongType.get()))
- .asStruct(),
- schemaAfter2.asStruct());
-
+ assertThat(schemaAfter2.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()),
+ Types.NestedField.optional(3, "col1",
Types.StringType.get()),
+ Types.NestedField.optional(4, "col2",
Types.LongType.get()))
+ .asStruct());
// Adding a required field should fail because Iceberg's SchemaUpdate does
not allow
// incompatible changes.
Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT
NULL)"))
@@ -360,36 +338,33 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
.hasMessageContaining("Try to add a column `id` which already exists
in the table.");
}
- @Test
+ @TestTemplate
public void testAlterTableDropColumn() {
sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)");
Schema schemaBefore = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()),
- Types.NestedField.optional(3, "col1", Types.StringType.get()),
- Types.NestedField.optional(4, "col2", Types.LongType.get()))
- .asStruct(),
- schemaBefore.asStruct());
-
+ assertThat(schemaBefore.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()),
+ Types.NestedField.optional(3, "col1",
Types.StringType.get()),
+ Types.NestedField.optional(4, "col2",
Types.LongType.get()))
+ .asStruct());
sql("ALTER TABLE tl DROP (dt)");
Schema schemaAfter1 = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(3, "col1", Types.StringType.get()),
- Types.NestedField.optional(4, "col2", Types.LongType.get()))
- .asStruct(),
- schemaAfter1.asStruct());
-
+ assertThat(schemaAfter1.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(3, "col1",
Types.StringType.get()),
+ Types.NestedField.optional(4, "col2",
Types.LongType.get()))
+ .asStruct());
// Drop multiple columns
sql("ALTER TABLE tl DROP (col1, col2)");
Schema schemaAfter2 = table("tl").schema();
- Assert.assertEquals(
- new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct(),
- schemaAfter2.asStruct());
-
+ assertThat(schemaAfter2.asStruct())
+ .isEqualTo(
+ new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct());
// Dropping an non-existing field should fail due to Flink's internal
validation.
Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)"))
.isInstanceOf(ValidationException.class)
@@ -401,48 +376,45 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
.hasMessageContaining("The column `dt` does not exist in the base
table.");
}
- @Test
+ @TestTemplate
public void testAlterTableModifyColumnName() {
sql("CREATE TABLE tl(id BIGINT, dt STRING)");
Schema schemaBefore = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()))
- .asStruct(),
- schemaBefore.asStruct());
-
+ assertThat(schemaBefore.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()))
+ .asStruct());
sql("ALTER TABLE tl RENAME dt TO data");
Schema schemaAfter = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "data", Types.StringType.get()))
- .asStruct(),
- schemaAfter.asStruct());
+ assertThat(schemaAfter.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data",
Types.StringType.get()))
+ .asStruct());
}
- @Test
+ @TestTemplate
public void testAlterTableModifyColumnType() {
sql("CREATE TABLE tl(id INTEGER, dt STRING)");
Schema schemaBefore = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.IntegerType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()))
- .asStruct(),
- schemaBefore.asStruct());
-
+ assertThat(schemaBefore.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()))
+ .asStruct());
// Promote type from Integer to Long
sql("ALTER TABLE tl MODIFY (id BIGINT)");
Schema schemaAfter = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()))
- .asStruct(),
- schemaAfter.asStruct());
-
+ assertThat(schemaAfter.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()))
+ .asStruct());
// Type change that doesn't follow the type-promotion rule should fail due
to Iceberg's
// validation.
Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt
INTEGER)"))
@@ -451,17 +423,16 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
.hasRootCauseMessage("Cannot change column type: dt: string -> int");
}
- @Test
+ @TestTemplate
public void testAlterTableModifyColumnNullability() {
sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)");
Schema schemaBefore = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.required(1, "id", Types.IntegerType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()))
- .asStruct(),
- schemaBefore.asStruct());
-
+ assertThat(schemaBefore.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.required(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()))
+ .asStruct());
// Changing nullability from optional to required should fail
// because Iceberg's SchemaUpdate does not allow incompatible changes.
Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING
NOT NULL)"))
@@ -472,43 +443,42 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
// Set nullability from required to optional
sql("ALTER TABLE tl MODIFY (id INTEGER)");
Schema schemaAfter = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.IntegerType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()))
- .asStruct(),
- schemaAfter.asStruct());
+ assertThat(schemaAfter.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()))
+ .asStruct());
}
- @Test
+ @TestTemplate
public void testAlterTableModifyColumnPosition() {
sql("CREATE TABLE tl(id BIGINT, dt STRING)");
Schema schemaBefore = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()))
- .asStruct(),
- schemaBefore.asStruct());
+ assertThat(schemaBefore.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()))
+ .asStruct());
sql("ALTER TABLE tl MODIFY (dt STRING FIRST)");
Schema schemaAfter = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(2, "dt", Types.StringType.get()),
- Types.NestedField.optional(1, "id", Types.LongType.get()))
- .asStruct(),
- schemaAfter.asStruct());
+ assertThat(schemaAfter.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()),
+ Types.NestedField.optional(1, "id", Types.LongType.get()))
+ .asStruct());
sql("ALTER TABLE tl MODIFY (dt STRING AFTER id)");
Schema schemaAfterAfter = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()))
- .asStruct(),
- schemaAfterAfter.asStruct());
-
+ assertThat(schemaAfterAfter.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()))
+ .asStruct());
// Modifying the position of a non-existing column should fail due to
Flink's internal
// validation.
Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY
(non_existing STRING FIRST)"))
@@ -523,67 +493,64 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
"Referenced column `non_existing` by 'AFTER' does not exist in the
table.");
}
- @Test
+ @TestTemplate
public void testAlterTableModifyColumnComment() {
sql("CREATE TABLE tl(id BIGINT, dt STRING)");
Schema schemaBefore = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get()))
- .asStruct(),
- schemaBefore.asStruct());
+ assertThat(schemaBefore.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get()))
+ .asStruct());
sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'comment for dt field')");
Schema schemaAfter = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
- Types.NestedField.optional(2, "dt", Types.StringType.get(),
"comment for dt field"))
- .asStruct(),
- schemaAfter.asStruct());
+ assertThat(schemaAfter.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(
+ 2, "dt", Types.StringType.get(), "comment for dt
field"))
+ .asStruct());
}
- @Test
+ @TestTemplate
public void testAlterTableConstraint() {
sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL, col1
STRING)");
Schema schemaBefore = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.required(1, "id", Types.LongType.get()),
- Types.NestedField.required(2, "dt", Types.StringType.get()),
- Types.NestedField.optional(3, "col1", Types.StringType.get()))
- .asStruct(),
- schemaBefore.asStruct());
- Assert.assertEquals(ImmutableSet.of(),
schemaBefore.identifierFieldNames());
-
+ assertThat(schemaBefore.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "dt",
Types.StringType.get()),
+ Types.NestedField.optional(3, "col1",
Types.StringType.get()))
+ .asStruct());
+ assertThat(schemaBefore.identifierFieldNames()).isEmpty();
sql("ALTER TABLE tl ADD (PRIMARY KEY (id) NOT ENFORCED)");
Schema schemaAfterAdd = table("tl").schema();
- Assert.assertEquals(ImmutableSet.of("id"),
schemaAfterAdd.identifierFieldNames());
-
+ assertThat(schemaAfterAdd.identifierFieldNames()).containsExactly("id");
sql("ALTER TABLE tl MODIFY (PRIMARY KEY (dt) NOT ENFORCED)");
Schema schemaAfterModify = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.required(1, "id", Types.LongType.get()),
- Types.NestedField.required(2, "dt", Types.StringType.get()),
- Types.NestedField.optional(3, "col1", Types.StringType.get()))
- .asStruct(),
- schemaAfterModify.asStruct());
- Assert.assertEquals(ImmutableSet.of("dt"),
schemaAfterModify.identifierFieldNames());
-
+ assertThat(schemaAfterModify.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "dt",
Types.StringType.get()),
+ Types.NestedField.optional(3, "col1",
Types.StringType.get()))
+ .asStruct());
+ assertThat(schemaAfterModify.identifierFieldNames()).containsExactly("dt");
// Composite primary key
sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, dt) NOT ENFORCED)");
Schema schemaAfterComposite = table("tl").schema();
- Assert.assertEquals(
- new Schema(
- Types.NestedField.required(1, "id", Types.LongType.get()),
- Types.NestedField.required(2, "dt", Types.StringType.get()),
- Types.NestedField.optional(3, "col1", Types.StringType.get()))
- .asStruct(),
- schemaAfterComposite.asStruct());
- Assert.assertEquals(ImmutableSet.of("id", "dt"),
schemaAfterComposite.identifierFieldNames());
-
+ assertThat(schemaAfterComposite.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "dt",
Types.StringType.get()),
+ Types.NestedField.optional(3, "col1",
Types.StringType.get()))
+ .asStruct());
+
assertThat(schemaAfterComposite.identifierFieldNames()).containsExactlyInAnyOrder("id",
"dt");
// Setting an optional field as primary key should fail
// because Iceberg's SchemaUpdate does not allow incompatible changes.
Assertions.assertThatThrownBy(
@@ -607,16 +574,15 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
.hasRootCauseMessage("Unsupported table change: DropConstraint.");
}
- @Test
+ @TestTemplate
public void testRelocateTable() {
- Assume.assumeFalse("HadoopCatalog does not support relocate table",
isHadoopCatalog);
-
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support relocate
table").isFalse();
sql("CREATE TABLE tl(id BIGINT)");
sql("ALTER TABLE tl SET('location'='file:///tmp/location')");
- Assert.assertEquals("file:///tmp/location", table("tl").location());
+ assertThat(table("tl").location()).isEqualTo("file:///tmp/location");
}
- @Test
+ @TestTemplate
public void testSetCurrentAndCherryPickSnapshotId() {
sql("CREATE TABLE tl(c1 INT, c2 STRING, c3 STRING) PARTITIONED BY (c1)");
@@ -651,9 +617,9 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
table.newReplacePartitions().addFile(replacementFile).stageOnly().commit();
Snapshot staged = Iterables.getLast(table.snapshots());
- Assert.assertEquals(
- "Should find the staged overwrite snapshot", DataOperations.OVERWRITE,
staged.operation());
-
+ assertThat(staged.operation())
+ .as("Should find the staged overwrite snapshot")
+ .isEqualTo(DataOperations.OVERWRITE);
// add another append so that the original commit can't be fast-forwarded
table.newAppend().appendFile(fileB).commit();
@@ -675,7 +641,7 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
.map(FileScanTask::file)
.map(ContentFile::path)
.collect(Collectors.toSet());
- Assert.assertEquals("Files should match", expectedFilePaths,
actualFilePaths);
+ assertThat(actualFilePaths).as("Files should
match").isEqualTo(expectedFilePaths);
}
private Table table(String name) {
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index 05fd1bad1d..b32be379ca 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -53,7 +53,7 @@ public class TestFlinkCatalogTablePartitions extends
CatalogTestBase {
for (FileFormat format :
new FileFormat[] {FileFormat.ORC, FileFormat.AVRO,
FileFormat.PARQUET}) {
for (Boolean cacheEnabled : new Boolean[] {true, false}) {
- for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+ for (Object[] catalogParams : CatalogTestBase.parameters()) {
String catalogName = (String) catalogParams[0];
Namespace baseNamespace = (Namespace) catalogParams[1];
parameters.add(new Object[] {catalogName, baseNamespace, format,
cacheEnabled});
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
index 8f238587d3..47ee2afceb 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
@@ -42,7 +42,7 @@ public class TestFlinkHiveCatalog extends FlinkTestBase {
Map<String, String> props = Maps.newHashMap();
props.put("type", "iceberg");
props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
- props.put(CatalogProperties.URI, FlinkCatalogTestBase.getURI(hiveConf));
+ props.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf));
File warehouseDir = tempFolder.newFolder();
props.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" +
warehouseDir.getAbsolutePath());
@@ -69,7 +69,7 @@ public class TestFlinkHiveCatalog extends FlinkTestBase {
Map<String, String> props = Maps.newHashMap();
props.put("type", "iceberg");
props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
- props.put(CatalogProperties.URI, FlinkCatalogTestBase.getURI(hiveConf));
+ props.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf));
// Set the 'hive-conf-dir' instead of 'warehouse'
props.put(FlinkCatalogFactory.HIVE_CONF_DIR,
hiveConfDir.getAbsolutePath());
@@ -78,9 +78,7 @@ public class TestFlinkHiveCatalog extends FlinkTestBase {
private void checkSQLQuery(Map<String, String> catalogProperties, File
warehouseDir)
throws IOException {
- sql(
- "CREATE CATALOG test_catalog WITH %s",
- FlinkCatalogTestBase.toWithClause(catalogProperties));
+ sql("CREATE CATALOG test_catalog WITH %s",
CatalogTestBase.toWithClause(catalogProperties));
sql("USE CATALOG test_catalog");
sql("CREATE DATABASE test_db");
sql("USE test_db");
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index 7540627989..b7fce104f4 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -18,6 +18,9 @@
*/
package org.apache.iceberg.flink;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -32,11 +35,12 @@ import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
@@ -45,42 +49,30 @@ import org.apache.iceberg.flink.source.BoundedTableFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFlinkTableSink extends FlinkCatalogTestBase {
-
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
-
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestFlinkTableSink extends CatalogTestBase {
private static final String SOURCE_TABLE =
"default_catalog.default_database.bounded_source";
private static final String TABLE_NAME = "test_table";
private TableEnvironment tEnv;
private Table icebergTable;
- private final FileFormat format;
- private final boolean isStreamingJob;
+ @Parameter(index = 2)
+ private FileFormat format;
- @Parameterized.Parameters(
- name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
- public static Iterable<Object[]> parameters() {
+ @Parameter(index = 3)
+ private boolean isStreamingJob;
+
+ @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2},
isStreaming={3}")
+ public static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
new FileFormat[] {FileFormat.ORC, FileFormat.AVRO,
FileFormat.PARQUET}) {
for (Boolean isStreaming : new Boolean[] {true, false}) {
- for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+ for (Object[] catalogParams : CatalogTestBase.parameters()) {
String catalogName = (String) catalogParams[0];
Namespace baseNamespace = (Namespace) catalogParams[1];
parameters.add(new Object[] {catalogName, baseNamespace, format,
isStreaming});
@@ -90,13 +82,6 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase
{
return parameters;
}
- public TestFlinkTableSink(
- String catalogName, Namespace baseNamespace, FileFormat format, Boolean
isStreamingJob) {
- super(catalogName, baseNamespace);
- this.format = format;
- this.isStreamingJob = isStreamingJob;
- }
-
@Override
protected TableEnvironment getTableEnv() {
if (tEnv == null) {
@@ -121,7 +106,7 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
}
@Override
- @Before
+ @BeforeEach
public void before() {
super.before();
sql("CREATE DATABASE %s", flinkDatabase);
@@ -134,7 +119,7 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
}
@Override
- @After
+ @AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
@@ -142,7 +127,7 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
super.clean();
}
- @Test
+ @TestTemplate
public void testInsertFromSourceTable() throws Exception {
// Register the rows into a temporary table.
getTableEnv()
@@ -169,10 +154,11 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
SimpleDataUtil.createRecord(null, "bar")));
}
- @Test
+ @TestTemplate
public void testOverwriteTable() throws Exception {
- Assume.assumeFalse(
- "Flink unbounded streaming does not support overwrite operation",
isStreamingJob);
+ assumeThat(isStreamingJob)
+ .as("Flink unbounded streaming does not support overwrite operation")
+ .isFalse();
sql("INSERT INTO %s SELECT 1, 'a'", TABLE_NAME);
SimpleDataUtil.assertTableRecords(
@@ -183,7 +169,7 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
}
- @Test
+ @TestTemplate
public void testWriteParallelism() throws Exception {
List<Row> dataSet =
IntStream.range(1, 1000)
@@ -206,22 +192,21 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
Transformation<?> committer = dummySink.getInputs().get(0);
Transformation<?> writer = committer.getInputs().get(0);
- Assert.assertEquals("Should have the expected 1 parallelism.", 1,
writer.getParallelism());
-
+ assertThat(writer.getParallelism()).as("Should have the expected 1
parallelism.").isEqualTo(1);
writer
.getInputs()
.forEach(
input ->
- Assert.assertEquals(
- "Should have the expected parallelism.",
- isStreamingJob ? 2 : 4,
- input.getParallelism()));
+ assertThat(input.getParallelism())
+ .as("Should have the expected parallelism.")
+ .isEqualTo(isStreamingJob ? 2 : 4));
}
- @Test
+ @TestTemplate
public void testReplacePartitions() throws Exception {
- Assume.assumeFalse(
- "Flink unbounded streaming does not support overwrite operation",
isStreamingJob);
+ assumeThat(isStreamingJob)
+ .as("Flink unbounded streaming does not support overwrite operation")
+ .isFalse();
String tableName = "test_partition";
sql(
"CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH
('write.format.default'='%s')",
@@ -265,7 +250,7 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
}
}
- @Test
+ @TestTemplate
public void testInsertIntoPartition() throws Exception {
String tableName = "test_insert_into_partition";
sql(
@@ -305,7 +290,7 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
}
}
- @Test
+ @TestTemplate
public void testHashDistributeMode() throws Exception {
String tableName = "test_hash_distribution_mode";
Map<String, String> tableProps =
@@ -326,10 +311,10 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
"CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+ " WITH ('connector'='BoundedSource', 'data-id'='%s')",
SOURCE_TABLE, dataId);
- Assert.assertEquals(
- "Should have the expected rows in source table.",
- Sets.newHashSet(dataSet),
- Sets.newHashSet(sql("SELECT * FROM %s", SOURCE_TABLE)));
+
+ assertThat(sql("SELECT * FROM %s", SOURCE_TABLE))
+ .as("Should have the expected rows in source table.")
+ .containsExactlyInAnyOrderElementsOf(dataSet);
sql(
"CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
@@ -339,10 +324,9 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
// Insert data set.
sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
- Assert.assertEquals(
- "Should have the expected rows in sink table.",
- Sets.newHashSet(dataSet),
- Sets.newHashSet(sql("SELECT * FROM %s", tableName)));
+ assertThat(sql("SELECT * FROM %s", tableName))
+ .as("Should have the expected rows in sink table.")
+ .containsExactlyInAnyOrderElementsOf(dataSet);
// Sometimes we will have more than one checkpoint if we pass the auto
checkpoint interval,
// thus producing multiple snapshots. Here we assert that each snapshot
has only 1 file per
@@ -354,24 +338,18 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
continue;
}
- Assert.assertEquals(
- "There should be 1 data file in partition 'aaa'",
- 1,
- SimpleDataUtil.matchingPartitions(
- dataFiles, table.spec(), ImmutableMap.of("data", "aaa"))
- .size());
- Assert.assertEquals(
- "There should be 1 data file in partition 'bbb'",
- 1,
- SimpleDataUtil.matchingPartitions(
- dataFiles, table.spec(), ImmutableMap.of("data", "bbb"))
- .size());
- Assert.assertEquals(
- "There should be 1 data file in partition 'ccc'",
- 1,
- SimpleDataUtil.matchingPartitions(
- dataFiles, table.spec(), ImmutableMap.of("data", "ccc"))
- .size());
+ assertThat(
+ SimpleDataUtil.matchingPartitions(
+ dataFiles, table.spec(), ImmutableMap.of("data", "aaa")))
+ .hasSize(1);
+ assertThat(
+ SimpleDataUtil.matchingPartitions(
+ dataFiles, table.spec(), ImmutableMap.of("data", "bbb")))
+ .hasSize(1);
+ assertThat(
+ SimpleDataUtil.matchingPartitions(
+ dataFiles, table.spec(), ImmutableMap.of("data", "ccc")))
+ .hasSize(1);
}
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index a25ebab6c4..5674c83e40 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -25,47 +25,32 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
-@RunWith(Parameterized.class)
-public class TestFlinkUpsert extends FlinkCatalogTestBase {
+public class TestFlinkUpsert extends CatalogTestBase {
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
+ @Parameter(index = 2)
+ private FileFormat format;
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+ @Parameter(index = 3)
+ private boolean isStreamingJob;
- private final boolean isStreamingJob;
private final Map<String, String> tableUpsertProps = Maps.newHashMap();
private TableEnvironment tEnv;
- public TestFlinkUpsert(
- String catalogName, Namespace baseNamespace, FileFormat format, Boolean
isStreamingJob) {
- super(catalogName, baseNamespace);
- this.isStreamingJob = isStreamingJob;
- tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
- tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
- tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
- }
-
- @Parameterized.Parameters(
- name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
- public static Iterable<Object[]> parameters() {
+ @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2},
isStreaming={3}")
+ public static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO,
FileFormat.ORC}) {
@@ -105,22 +90,25 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
}
@Override
- @Before
+ @BeforeEach
public void before() {
super.before();
sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
+ tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+ tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
+ tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
}
@Override
- @After
+ @AfterEach
public void clean() {
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
super.clean();
}
- @Test
+ @TestTemplate
public void testUpsertAndQuery() {
String tableName = "test_upsert_query";
LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
@@ -164,7 +152,7 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
}
}
- @Test
+ @TestTemplate
public void testUpsertOptions() {
String tableName = "test_upsert_options";
LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
@@ -210,7 +198,7 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
}
}
- @Test
+ @TestTemplate
public void testPrimaryKeyEqualToPartitionKey() {
// This is an SQL based reproduction of
TestFlinkIcebergSinkV2#testUpsertOnDataKey
String tableName = "upsert_on_id_key";
@@ -243,7 +231,7 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
}
}
- @Test
+ @TestTemplate
public void testPrimaryKeyFieldsAtBeginningOfSchema() {
String tableName = "upsert_on_pk_at_schema_start";
LocalDate dt = LocalDate.of(2022, 3, 1);
@@ -292,7 +280,7 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
}
}
- @Test
+ @TestTemplate
public void testPrimaryKeyFieldsAtEndOfTableSchema() {
// This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema,
but the primary key
// fields
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index 4f71b5fe8d..cb409b7843 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -280,7 +280,7 @@ public class TestIcebergConnector extends FlinkTestBase {
catalogProps.put("type", "iceberg");
if (isHiveCatalog()) {
catalogProps.put("catalog-type", "hive");
- catalogProps.put(CatalogProperties.URI,
FlinkCatalogTestBase.getURI(hiveConf));
+ catalogProps.put(CatalogProperties.URI,
CatalogTestBase.getURI(hiveConf));
} else {
catalogProps.put("catalog-type", "hadoop");
}
@@ -315,7 +315,7 @@ public class TestIcebergConnector extends FlinkTestBase {
tableProps.put("catalog-name", catalogName);
tableProps.put(CatalogProperties.WAREHOUSE_LOCATION, createWarehouse());
if (isHiveCatalog()) {
- tableProps.put(CatalogProperties.URI,
FlinkCatalogTestBase.getURI(hiveConf));
+ tableProps.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf));
}
return tableProps;
}
@@ -337,7 +337,7 @@ public class TestIcebergConnector extends FlinkTestBase {
}
private String toWithClause(Map<String, String> props) {
- return FlinkCatalogTestBase.toWithClause(props);
+ return CatalogTestBase.toWithClause(props);
}
private static String createWarehouse() {
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
index 07e5ca051d..4220775f41 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
@@ -19,9 +19,11 @@
package org.apache.iceberg.flink.actions;
import static org.apache.iceberg.flink.SimpleDataUtil.RECORD;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -39,6 +41,8 @@ import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFilesActionResult;
@@ -49,7 +53,7 @@ import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.CatalogTestBase;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
@@ -59,44 +63,36 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestRewriteDataFilesAction extends CatalogTestBase {
private static final String TABLE_NAME_UNPARTITIONED =
"test_table_unpartitioned";
private static final String TABLE_NAME_PARTITIONED =
"test_table_partitioned";
private static final String TABLE_NAME_WITH_PK = "test_table_with_pk";
- private final FileFormat format;
+
+ @Parameter(index = 2)
+ private FileFormat format;
+
private Table icebergTableUnPartitioned;
private Table icebergTablePartitioned;
private Table icebergTableWithPk;
- public TestRewriteDataFilesAction(
- String catalogName, Namespace baseNamespace, FileFormat format) {
- super(catalogName, baseNamespace);
- this.format = format;
- }
-
@Override
protected TableEnvironment getTableEnv() {
super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM,
1);
return super.getTableEnv();
}
- @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1},
format={2}")
- public static Iterable<Object[]> parameters() {
+ @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}")
+ public static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
new FileFormat[] {FileFormat.AVRO, FileFormat.ORC,
FileFormat.PARQUET}) {
- for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+ for (Object[] catalogParams : CatalogTestBase.parameters()) {
String catalogName = (String) catalogParams[0];
Namespace baseNamespace = (Namespace) catalogParams[1];
parameters.add(new Object[] {catalogName, baseNamespace, format});
@@ -105,10 +101,10 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
return parameters;
}
- @Rule public TemporaryFolder temp = new TemporaryFolder();
+ private @TempDir Path temp;
@Override
- @Before
+ @BeforeEach
public void before() {
super.before();
sql("CREATE DATABASE %s", flinkDatabase);
@@ -135,7 +131,7 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
}
@Override
- @After
+ @AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED);
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED);
@@ -144,14 +140,14 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
super.clean();
}
- @Test
+ @TestTemplate
public void testRewriteDataFilesEmptyTable() throws Exception {
- Assert.assertNull("Table must be empty",
icebergTableUnPartitioned.currentSnapshot());
+ assertThat(icebergTableUnPartitioned.currentSnapshot()).isNull();
Actions.forTable(icebergTableUnPartitioned).rewriteDataFiles().execute();
- Assert.assertNull("Table must stay empty",
icebergTableUnPartitioned.currentSnapshot());
+ assertThat(icebergTableUnPartitioned.currentSnapshot()).isNull();
}
- @Test
+ @TestTemplate
public void testRewriteDataFilesUnpartitionedTable() throws Exception {
sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
@@ -161,21 +157,19 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
CloseableIterable<FileScanTask> tasks =
icebergTableUnPartitioned.newScan().planFiles();
List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::file));
- Assert.assertEquals("Should have 2 data files before rewrite", 2,
dataFiles.size());
-
+ assertThat(dataFiles).hasSize(2);
RewriteDataFilesActionResult result =
Actions.forTable(icebergTableUnPartitioned).rewriteDataFiles().execute();
- Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
- Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+ assertThat(result.deletedDataFiles()).hasSize(2);
+ assertThat(result.addedDataFiles()).hasSize(1);
icebergTableUnPartitioned.refresh();
CloseableIterable<FileScanTask> tasks1 =
icebergTableUnPartitioned.newScan().planFiles();
List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1,
FileScanTask::file));
- Assert.assertEquals("Should have 1 data files after rewrite", 1,
dataFiles1.size());
-
+ assertThat(dataFiles1).hasSize(1);
// Assert the table records as expected.
SimpleDataUtil.assertTableRecords(
icebergTableUnPartitioned,
@@ -183,7 +177,7 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
SimpleDataUtil.createRecord(1, "hello"),
SimpleDataUtil.createRecord(2, "world")));
}
- @Test
+ @TestTemplate
public void testRewriteDataFilesPartitionedTable() throws Exception {
sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
@@ -195,21 +189,19 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
CloseableIterable<FileScanTask> tasks =
icebergTablePartitioned.newScan().planFiles();
List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::file));
- Assert.assertEquals("Should have 4 data files before rewrite", 4,
dataFiles.size());
-
+ assertThat(dataFiles).hasSize(4);
RewriteDataFilesActionResult result =
Actions.forTable(icebergTablePartitioned).rewriteDataFiles().execute();
- Assert.assertEquals("Action should rewrite 4 data files", 4,
result.deletedDataFiles().size());
- Assert.assertEquals("Action should add 2 data file", 2,
result.addedDataFiles().size());
+ assertThat(result.deletedDataFiles()).hasSize(4);
+ assertThat(result.addedDataFiles()).hasSize(2);
icebergTablePartitioned.refresh();
CloseableIterable<FileScanTask> tasks1 =
icebergTablePartitioned.newScan().planFiles();
List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1,
FileScanTask::file));
- Assert.assertEquals("Should have 2 data files after rewrite", 2,
dataFiles1.size());
-
+ assertThat(dataFiles1).hasSize(2);
// Assert the table records as expected.
Schema schema =
new Schema(
@@ -227,7 +219,7 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
record.copy("id", 4, "data", "world", "spec", "b")));
}
- @Test
+ @TestTemplate
public void testRewriteDataFilesWithFilter() throws Exception {
sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
@@ -240,25 +232,22 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
CloseableIterable<FileScanTask> tasks =
icebergTablePartitioned.newScan().planFiles();
List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::file));
- Assert.assertEquals("Should have 5 data files before rewrite", 5,
dataFiles.size());
-
+ assertThat(dataFiles).hasSize(5);
RewriteDataFilesActionResult result =
Actions.forTable(icebergTablePartitioned)
.rewriteDataFiles()
.filter(Expressions.equal("spec", "a"))
.filter(Expressions.startsWith("data", "he"))
.execute();
-
- Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
- Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+ assertThat(result.deletedDataFiles()).hasSize(2);
+ assertThat(result.addedDataFiles()).hasSize(1);
icebergTablePartitioned.refresh();
CloseableIterable<FileScanTask> tasks1 =
icebergTablePartitioned.newScan().planFiles();
List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1,
FileScanTask::file));
- Assert.assertEquals("Should have 4 data files after rewrite", 4,
dataFiles1.size());
-
+ assertThat(dataFiles1).hasSize(4);
// Assert the table records as expected.
Schema schema =
new Schema(
@@ -277,7 +266,7 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
record.copy("id", 5, "data", "world", "spec", "b")));
}
- @Test
+ @TestTemplate
public void testRewriteLargeTableHasResiduals() throws IOException {
// all records belong to the same partition
List<String> records1 = Lists.newArrayList();
@@ -309,19 +298,19 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
.filter(Expressions.equal("data", "0"))
.planFiles();
for (FileScanTask task : tasks) {
- Assert.assertEquals("Residuals must be ignored",
Expressions.alwaysTrue(), task.residual());
+ assertThat(task.residual())
+ .as("Residuals must be ignored")
+ .isEqualTo(Expressions.alwaysTrue());
}
List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::file));
- Assert.assertEquals("Should have 2 data files before rewrite", 2,
dataFiles.size());
-
+ assertThat(dataFiles).hasSize(2);
Actions actions = Actions.forTable(icebergTableUnPartitioned);
RewriteDataFilesActionResult result =
actions.rewriteDataFiles().filter(Expressions.equal("data",
"0")).execute();
- Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
- Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
-
+ assertThat(result.deletedDataFiles()).hasSize(2);
+ assertThat(result.addedDataFiles()).hasSize(1);
// Assert the table records as expected.
SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
}
@@ -339,12 +328,12 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
*
* @throws IOException IOException
*/
- @Test
+ @TestTemplate
public void testRewriteAvoidRepeateCompress() throws IOException {
List<Record> expected = Lists.newArrayList();
Schema schema = icebergTableUnPartitioned.schema();
GenericAppenderFactory genericAppenderFactory = new
GenericAppenderFactory(schema);
- File file = temp.newFile();
+ File file = File.createTempFile("junit", null, temp.toFile());
int count = 0;
try (FileAppender<Record> fileAppender =
genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
@@ -374,8 +363,7 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
CloseableIterable<FileScanTask> tasks =
icebergTableUnPartitioned.newScan().planFiles();
List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::file));
- Assert.assertEquals("Should have 3 data files before rewrite", 3,
dataFiles.size());
-
+ assertThat(dataFiles).hasSize(3);
Actions actions = Actions.forTable(icebergTableUnPartitioned);
long targetSizeInBytes = file.length() + 10;
@@ -385,20 +373,18 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
.targetSizeInBytes(targetSizeInBytes)
.splitOpenFileCost(1)
.execute();
- Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
- Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
-
+ assertThat(result.deletedDataFiles()).hasSize(2);
+ assertThat(result.addedDataFiles()).hasSize(1);
icebergTableUnPartitioned.refresh();
CloseableIterable<FileScanTask> tasks1 =
icebergTableUnPartitioned.newScan().planFiles();
List<DataFile> dataFilesRewrote =
Lists.newArrayList(CloseableIterable.transform(tasks1,
FileScanTask::file));
- Assert.assertEquals("Should have 2 data files after rewrite", 2,
dataFilesRewrote.size());
-
+ assertThat(dataFilesRewrote).hasSize(2);
// the biggest file do not be rewrote
List rewroteDataFileNames =
dataFilesRewrote.stream().map(ContentFile::path).collect(Collectors.toList());
- Assert.assertTrue(rewroteDataFileNames.contains(file.getAbsolutePath()));
+ assertThat(rewroteDataFileNames).contains(file.getAbsolutePath());
// Assert the table records as expected.
expected.add(SimpleDataUtil.createRecord(1, "a"));
@@ -406,7 +392,7 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
}
- @Test
+ @TestTemplate
public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
// Add 2 data files
sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
@@ -423,11 +409,9 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1,
'hi'", TABLE_NAME_WITH_PK);
icebergTableWithPk.refresh();
- Assert.assertEquals(
- "The latest sequence number should be greater than that of the stale
snapshot",
- stale1.currentSnapshot().sequenceNumber() + 1,
- icebergTableWithPk.currentSnapshot().sequenceNumber());
-
+ assertThat(icebergTableWithPk.currentSnapshot().sequenceNumber())
+ .as("The latest sequence number should be greater than that of the
stale snapshot")
+ .isEqualTo(stale1.currentSnapshot().sequenceNumber() + 1);
CloseableIterable<FileScanTask> tasks =
icebergTableWithPk.newScan().planFiles();
List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::file));
@@ -435,12 +419,10 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::deletes)).stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());
- Assert.assertEquals("Should have 3 data files before rewrite", 3,
dataFiles.size());
- Assert.assertEquals("Should have 1 delete file before rewrite", 1,
deleteFiles.size());
- Assert.assertSame(
- "The 1 delete file should be an equality-delete file",
- Iterables.getOnlyElement(deleteFiles).content(),
- FileContent.EQUALITY_DELETES);
+ assertThat(dataFiles).hasSize(3);
+ assertThat(deleteFiles).hasSize(1);
+ assertThat(Iterables.getOnlyElement(deleteFiles).content())
+ .isEqualTo(FileContent.EQUALITY_DELETES);
shouldHaveDataAndFileSequenceNumbers(
TABLE_NAME_WITH_PK,
ImmutableList.of(Pair.of(1L, 1L), Pair.of(2L, 2L), Pair.of(3L, 3L),
Pair.of(3L, 3L)));
@@ -459,8 +441,8 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
Actions.forTable(stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
// Should not rewrite files from the new commit
- Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
- Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+ assertThat(result.deletedDataFiles()).hasSize(2);
+ assertThat(result.addedDataFiles()).hasSize(1);
// The 2 older files with file-sequence-number <= 2 should be rewritten
into a new file.
// The new file is the one with file-sequence-number == 4.
// The new file should use rewrite's starting-sequence-number 2 as its
data-sequence-number.
@@ -494,6 +476,6 @@ public class TestRewriteDataFilesAction extends
FlinkCatalogTestBase {
Pair.<Long, Long>of(
row.getFieldAs("sequence_number"),
row.getFieldAs("file_sequence_number")))
.collect(Collectors.toList());
-
Assertions.assertThat(actualSequenceNumbers).hasSameElementsAs(expectedSequenceNumbers);
+
assertThat(actualSequenceNumbers).hasSameElementsAs(expectedSequenceNumbers);
}
}
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
index 5ecf4f4536..f58cc87c6a 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
@@ -18,7 +18,12 @@
*/
package org.apache.iceberg.flink.source;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
@@ -41,6 +46,8 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.MetricsUtil;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -52,7 +59,7 @@ import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.CatalogTestBase;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
@@ -60,29 +67,21 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFlinkMetaDataTable extends FlinkCatalogTestBase {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestFlinkMetaDataTable extends CatalogTestBase {
private static final String TABLE_NAME = "test_table";
private final FileFormat format = FileFormat.AVRO;
- private static final TemporaryFolder TEMP = new TemporaryFolder();
- private final boolean isPartition;
+ private @TempDir Path temp;
- public TestFlinkMetaDataTable(String catalogName, Namespace baseNamespace,
Boolean isPartition) {
- super(catalogName, baseNamespace);
- this.isPartition = isPartition;
- }
+ @Parameter(index = 2)
+ private Boolean isPartition;
- @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1},
isPartition={2}")
- public static Iterable<Object[]> parameters() {
+ @Parameters(name = "catalogName={0}, baseNamespace={1}, isPartition={2}")
+ protected static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (Boolean isPartition : new Boolean[] {true, false}) {
@@ -100,7 +99,7 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
return super.getTableEnv();
}
- @Before
+ @BeforeEach
public void before() {
super.before();
sql("USE CATALOG %s", catalogName);
@@ -124,14 +123,14 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
}
@Override
- @After
+ @AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
super.clean();
}
- @Test
+ @TestTemplate
public void testSnapshots() {
String sql = String.format("SELECT * FROM %s$snapshots ", TABLE_NAME);
List<Row> result = sql(sql);
@@ -140,22 +139,22 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
Iterator<Snapshot> snapshots = table.snapshots().iterator();
for (Row row : result) {
Snapshot next = snapshots.next();
- Assert.assertEquals(
- "Should have expected timestamp",
- ((Instant) row.getField(0)).toEpochMilli(),
- next.timestampMillis());
- Assert.assertEquals("Should have expected snapshot id",
next.snapshotId(), row.getField(1));
- Assert.assertEquals("Should have expected parent id", next.parentId(),
row.getField(2));
- Assert.assertEquals("Should have expected operation", next.operation(),
row.getField(3));
- Assert.assertEquals(
- "Should have expected manifest list location",
- row.getField(4),
- next.manifestListLocation());
- Assert.assertEquals("Should have expected summary", next.summary(),
row.getField(5));
+ assertThat(((Instant) row.getField(0)).toEpochMilli())
+ .as("Should have expected timestamp")
+ .isEqualTo(next.timestampMillis());
+ assertThat(next.snapshotId())
+ .as("Should have expected snapshot id")
+ .isEqualTo(next.snapshotId());
+ assertThat(row.getField(2)).as("Should have expected parent
id").isEqualTo(next.parentId());
+ assertThat(row.getField(3)).as("Should have expected
operation").isEqualTo(next.operation());
+ assertThat(row.getField(4))
+ .as("Should have expected manifest list location")
+ .isEqualTo(next.manifestListLocation());
+ assertThat(row.getField(5)).as("Should have expected
summary").isEqualTo(next.summary());
}
}
- @Test
+ @TestTemplate
public void testHistory() {
String sql = String.format("SELECT * FROM %s$history ", TABLE_NAME);
List<Row> result = sql(sql);
@@ -164,21 +163,22 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
Iterator<Snapshot> snapshots = table.snapshots().iterator();
for (Row row : result) {
Snapshot next = snapshots.next();
- Assert.assertEquals(
- "Should have expected made_current_at",
- ((Instant) row.getField(0)).toEpochMilli(),
- next.timestampMillis());
- Assert.assertEquals("Should have expected snapshot id",
next.snapshotId(), row.getField(1));
- Assert.assertEquals("Should have expected parent id", next.parentId(),
row.getField(2));
-
- Assert.assertEquals(
- "Should have expected is current ancestor",
- SnapshotUtil.isAncestorOf(table,
table.currentSnapshot().snapshotId(), next.snapshotId()),
- row.getField(3));
+ assertThat(((Instant) row.getField(0)).toEpochMilli())
+ .as("Should have expected made_current_at")
+ .isEqualTo(next.timestampMillis());
+ assertThat(row.getField(1))
+ .as("Should have expected snapshot id")
+ .isEqualTo(next.snapshotId());
+ assertThat(row.getField(2)).as("Should have expected parent
id").isEqualTo(next.parentId());
+ assertThat(row.getField(3))
+ .as("Should have expected is current ancestor")
+ .isEqualTo(
+ SnapshotUtil.isAncestorOf(
+ table, table.currentSnapshot().snapshotId(),
next.snapshotId()));
}
}
- @Test
+ @TestTemplate
public void testManifests() {
String sql = String.format("SELECT * FROM %s$manifests ", TABLE_NAME);
List<Row> result = sql(sql);
@@ -189,32 +189,32 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
for (int i = 0; i < result.size(); i++) {
Row row = result.get(i);
ManifestFile manifestFile = expectedDataManifests.get(i);
- Assert.assertEquals(
- "Should have expected content", manifestFile.content().id(),
row.getField(0));
- Assert.assertEquals("Should have expected path", manifestFile.path(),
row.getField(1));
- Assert.assertEquals("Should have expected length",
manifestFile.length(), row.getField(2));
- Assert.assertEquals(
- "Should have expected partition_spec_id",
- manifestFile.partitionSpecId(),
- row.getField(3));
- Assert.assertEquals(
- "Should have expected added_snapshot_id", manifestFile.snapshotId(),
row.getField(4));
- Assert.assertEquals(
- "Should have expected added_data_files_count",
- manifestFile.addedFilesCount(),
- row.getField(5));
- Assert.assertEquals(
- "Should have expected existing_data_files_count",
- manifestFile.existingFilesCount(),
- row.getField(6));
- Assert.assertEquals(
- "Should have expected deleted_data_files_count",
- manifestFile.deletedFilesCount(),
- row.getField(7));
+ assertThat(row.getField(0))
+ .as("Should have expected content")
+ .isEqualTo(manifestFile.content().id());
+ assertThat(row.getField(1)).as("Should have expected
path").isEqualTo(manifestFile.path());
+ assertThat(row.getField(2))
+ .as("Should have expected length")
+ .isEqualTo(manifestFile.length());
+ assertThat(row.getField(3))
+ .as("Should have expected partition_spec_id")
+ .isEqualTo(manifestFile.partitionSpecId());
+ assertThat(row.getField(4))
+ .as("Should have expected added_snapshot_id")
+ .isEqualTo(manifestFile.snapshotId());
+ assertThat(row.getField(5))
+ .as("Should have expected added_data_files_count")
+ .isEqualTo(manifestFile.addedFilesCount());
+ assertThat(row.getField(6))
+ .as("Should have expected existing_data_files_count")
+ .isEqualTo(manifestFile.existingFilesCount());
+ assertThat(row.getField(7))
+ .as("Should have expected deleted_data_files_count")
+ .isEqualTo(manifestFile.deletedFilesCount());
}
}
- @Test
+ @TestTemplate
public void testAllManifests() {
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
@@ -223,55 +223,54 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
List<ManifestFile> expectedDataManifests = allDataManifests(table);
- Assert.assertEquals(expectedDataManifests.size(), result.size());
+ assertThat(expectedDataManifests).hasSize(result.size());
for (int i = 0; i < result.size(); i++) {
Row row = result.get(i);
ManifestFile manifestFile = expectedDataManifests.get(i);
- Assert.assertEquals(
- "Should have expected content", manifestFile.content().id(),
row.getField(0));
- Assert.assertEquals("Should have expected path", manifestFile.path(),
row.getField(1));
- Assert.assertEquals("Should have expected length",
manifestFile.length(), row.getField(2));
- Assert.assertEquals(
- "Should have expected partition_spec_id",
- manifestFile.partitionSpecId(),
- row.getField(3));
- Assert.assertEquals(
- "Should have expected added_snapshot_id", manifestFile.snapshotId(),
row.getField(4));
- Assert.assertEquals(
- "Should have expected added_data_files_count",
- manifestFile.addedFilesCount(),
- row.getField(5));
- Assert.assertEquals(
- "Should have expected existing_data_files_count",
- manifestFile.existingFilesCount(),
- row.getField(6));
- Assert.assertEquals(
- "Should have expected deleted_data_files_count",
- manifestFile.deletedFilesCount(),
- row.getField(7));
+ assertThat(row.getField(0))
+ .as("Should have expected content")
+ .isEqualTo(manifestFile.content().id());
+ assertThat(row.getField(1)).as("Should have expected
path").isEqualTo(manifestFile.path());
+ assertThat(row.getField(2))
+ .as("Should have expected length")
+ .isEqualTo(manifestFile.length());
+ assertThat(row.getField(3))
+ .as("Should have expected partition_spec_id")
+ .isEqualTo(manifestFile.partitionSpecId());
+ assertThat(row.getField(4))
+ .as("Should have expected added_snapshot_id")
+ .isEqualTo(manifestFile.snapshotId());
+ assertThat(row.getField(5))
+ .as("Should have expected added_data_files_count")
+ .isEqualTo(manifestFile.addedFilesCount());
+ assertThat(row.getField(6))
+ .as("Should have expected existing_data_files_count")
+ .isEqualTo(manifestFile.existingFilesCount());
+ assertThat(row.getField(7))
+ .as("Should have expected deleted_data_files_count")
+ .isEqualTo(manifestFile.deletedFilesCount());
}
}
- @Test
+ @TestTemplate
public void testUnPartitionedTable() throws IOException {
- Assume.assumeFalse(isPartition);
+ assumeThat(isPartition).isFalse();
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
Schema deleteRowSchema = table.schema().select("id");
Record dataDelete = GenericRecord.create(deleteRowSchema);
List<Record> dataDeletes = Lists.newArrayList(dataDelete.copy("id", 1));
-
- TEMP.create();
+ File testFile = File.createTempFile("junit", null, temp.toFile());
DeleteFile eqDeletes =
FileHelpers.writeDeleteFile(
- table, Files.localOutput(TEMP.newFile()), dataDeletes,
deleteRowSchema);
+ table, Files.localOutput(testFile), dataDeletes, deleteRowSchema);
table.newRowDelta().addDeletes(eqDeletes).commit();
List<ManifestFile> expectedDataManifests = dataManifests(table);
List<ManifestFile> expectedDeleteManifests = deleteManifests(table);
- Assert.assertEquals("Should have 2 data manifest", 2,
expectedDataManifests.size());
- Assert.assertEquals("Should have 1 delete manifest", 1,
expectedDeleteManifests.size());
+ assertThat(expectedDataManifests).hasSize(2);
+ assertThat(expectedDeleteManifests).hasSize(1);
Schema entriesTableSchema =
MetadataTableUtils.createMetadataTableInstance(table,
MetadataTableType.from("entries"))
@@ -294,12 +293,13 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
deleteFilesTableSchema = deleteFilesTableSchema.select(deleteColumns);
List<Row> actualDeleteFiles = sql("SELECT %s FROM %s$delete_files",
deleteNames, TABLE_NAME);
- Assert.assertEquals("Metadata table should return 1 delete file", 1,
actualDeleteFiles.size());
+ assertThat(actualDeleteFiles).hasSize(1);
+ assertThat(expectedDeleteManifests).as("Should have 1 delete
manifest").hasSize(1);
List<GenericData.Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.EQUALITY_DELETES, entriesTableSchema,
expectedDeleteManifests, null);
- Assert.assertEquals("Should be 1 delete file manifest entry", 1,
expectedDeleteFiles.size());
+ assertThat(expectedDeleteFiles).as("Should be 1 delete file manifest
entry").hasSize(1);
TestHelpers.assertEquals(
deleteFilesTableSchema, expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
@@ -318,51 +318,50 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
filesTableSchema = filesTableSchema.select(columns);
List<Row> actualDataFiles = sql("SELECT %s FROM %s$data_files", names,
TABLE_NAME);
- Assert.assertEquals("Metadata table should return 2 data file", 2,
actualDataFiles.size());
-
+ assertThat(actualDataFiles).as("Metadata table should return 2 data
file").hasSize(2);
List<GenericData.Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
- Assert.assertEquals("Should be 2 data file manifest entry", 2,
expectedDataFiles.size());
+ assertThat(expectedDataFiles).as("Should be 2 data file manifest
entry").hasSize(2);
TestHelpers.assertEquals(filesTableSchema, expectedDataFiles.get(0),
actualDataFiles.get(0));
// check all files table
List<Row> actualFiles = sql("SELECT %s FROM %s$files ORDER BY content",
names, TABLE_NAME);
- Assert.assertEquals("Metadata table should return 3 files", 3,
actualFiles.size());
-
+ assertThat(actualFiles).as("Metadata table should return 3
files").hasSize(3);
List<GenericData.Record> expectedFiles =
Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream())
.collect(Collectors.toList());
- Assert.assertEquals("Should have 3 files manifest entries", 3,
expectedFiles.size());
+ assertThat(expectedFiles).as("Should have 3 files manifest
entriess").hasSize(3);
TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(0),
actualFiles.get(0));
TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(1),
actualFiles.get(1));
}
- @Test
+ @TestTemplate
public void testPartitionedTable() throws Exception {
- Assume.assumeFalse(!isPartition);
+ assumeThat(isPartition).isTrue();
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
Schema deleteRowSchema = table.schema().select("id", "data");
Record dataDelete = GenericRecord.create(deleteRowSchema);
- TEMP.create();
Map<String, Object> deleteRow = Maps.newHashMap();
deleteRow.put("id", 1);
deleteRow.put("data", "a");
+ File testFile = File.createTempFile("junit", null, temp.toFile());
DeleteFile eqDeletes =
FileHelpers.writeDeleteFile(
table,
- Files.localOutput(TEMP.newFile()),
+ Files.localOutput(testFile),
org.apache.iceberg.TestHelpers.Row.of("a"),
Lists.newArrayList(dataDelete.copy(deleteRow)),
deleteRowSchema);
table.newRowDelta().addDeletes(eqDeletes).commit();
deleteRow.put("data", "b");
+ File testFile2 = File.createTempFile("junit", null, temp.toFile());
DeleteFile eqDeletes2 =
FileHelpers.writeDeleteFile(
table,
- Files.localOutput(TEMP.newFile()),
+ Files.localOutput(testFile2),
org.apache.iceberg.TestHelpers.Row.of("b"),
Lists.newArrayList(dataDelete.copy(deleteRow)),
deleteRowSchema);
@@ -375,9 +374,8 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
List<ManifestFile> expectedDataManifests = dataManifests(table);
List<ManifestFile> expectedDeleteManifests = deleteManifests(table);
- Assert.assertEquals("Should have 2 data manifests", 2,
expectedDataManifests.size());
- Assert.assertEquals("Should have 2 delete manifests", 2,
expectedDeleteManifests.size());
-
+ assertThat(expectedDataManifests).hasSize(2);
+ assertThat(expectedDeleteManifests).hasSize(2);
Table deleteFilesTable =
MetadataTableUtils.createMetadataTableInstance(
table, MetadataTableType.from("delete_files"));
@@ -396,75 +394,67 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
List<GenericData.Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.EQUALITY_DELETES, entriesTableSchema,
expectedDeleteManifests, "a");
- Assert.assertEquals(
- "Should have one delete file manifest entry", 1,
expectedDeleteFiles.size());
-
+ assertThat(expectedDeleteFiles).hasSize(1);
List<Row> actualDeleteFiles =
sql("SELECT %s FROM %s$delete_files WHERE `partition`.`data`='a'",
names, TABLE_NAME);
- Assert.assertEquals(
- "Metadata table should return one delete file", 1,
actualDeleteFiles.size());
+ assertThat(actualDeleteFiles).hasSize(1);
TestHelpers.assertEquals(
filesTableSchema, expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
// Check data files table
List<GenericData.Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, "a");
- Assert.assertEquals("Should have one data file manifest entry", 1,
expectedDataFiles.size());
-
+ assertThat(expectedDataFiles).hasSize(1);
List<Row> actualDataFiles =
sql("SELECT %s FROM %s$data_files WHERE `partition`.`data`='a'",
names, TABLE_NAME);
- Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
+ assertThat(actualDataFiles).hasSize(1);
TestHelpers.assertEquals(filesTableSchema, expectedDataFiles.get(0),
actualDataFiles.get(0));
List<Row> actualPartitionsWithProjection =
sql("SELECT file_count FROM %s$partitions ", TABLE_NAME);
- Assert.assertEquals(
- "Metadata table should return two partitions record",
- 2,
- actualPartitionsWithProjection.size());
+ assertThat(actualPartitionsWithProjection).hasSize(2);
for (int i = 0; i < 2; ++i) {
- Assert.assertEquals(1,
actualPartitionsWithProjection.get(i).getField(0));
+
assertThat(actualPartitionsWithProjection.get(i).getField(0)).isEqualTo(1);
}
// Check files table
List<GenericData.Record> expectedFiles =
Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream())
.collect(Collectors.toList());
- Assert.assertEquals("Should have two file manifest entries", 2,
expectedFiles.size());
-
+ assertThat(expectedFiles).hasSize(2);
List<Row> actualFiles =
sql(
"SELECT %s FROM %s$files WHERE `partition`.`data`='a' ORDER BY
content",
names, TABLE_NAME);
- Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
+ assertThat(actualFiles).hasSize(2);
TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(0),
actualFiles.get(0));
TestHelpers.assertEquals(filesTableSchema, expectedFiles.get(1),
actualFiles.get(1));
}
- @Test
+ @TestTemplate
public void testAllFilesUnpartitioned() throws Exception {
- Assume.assumeFalse(isPartition);
+ assumeThat(isPartition).isFalse();
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
Schema deleteRowSchema = table.schema().select("id", "data");
Record dataDelete = GenericRecord.create(deleteRowSchema);
- TEMP.create();
Map<String, Object> deleteRow = Maps.newHashMap();
deleteRow.put("id", 1);
+ File testFile = File.createTempFile("junit", null, temp.toFile());
DeleteFile eqDeletes =
FileHelpers.writeDeleteFile(
table,
- Files.localOutput(TEMP.newFile()),
+ Files.localOutput(testFile),
Lists.newArrayList(dataDelete.copy(deleteRow)),
deleteRowSchema);
table.newRowDelta().addDeletes(eqDeletes).commit();
List<ManifestFile> expectedDataManifests = dataManifests(table);
- Assert.assertEquals("Should have 2 data manifest", 2,
expectedDataManifests.size());
+ assertThat(expectedDataManifests).hasSize(2);
List<ManifestFile> expectedDeleteManifests = deleteManifests(table);
- Assert.assertEquals("Should have 1 delete manifest", 1,
expectedDeleteManifests.size());
+ assertThat(expectedDeleteManifests).hasSize(1);
// Clear table to test whether 'all_files' can read past files
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
@@ -492,8 +482,8 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
List<GenericData.Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
- Assert.assertEquals("Should be 2 data file manifest entry", 2,
expectedDataFiles.size());
- Assert.assertEquals("Metadata table should return 2 data file", 2,
actualDataFiles.size());
+ assertThat(expectedDataFiles).hasSize(2);
+ assertThat(actualDataFiles).hasSize(2);
TestHelpers.assertEquals(filesTableSchema, expectedDataFiles,
actualDataFiles);
// Check all delete files table
@@ -501,9 +491,8 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
List<GenericData.Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.EQUALITY_DELETES, entriesTableSchema,
expectedDeleteManifests, null);
- Assert.assertEquals("Should be one delete file manifest entry", 1,
expectedDeleteFiles.size());
- Assert.assertEquals(
- "Metadata table should return one delete file", 1,
actualDeleteFiles.size());
+ assertThat(expectedDeleteFiles).hasSize(1);
+ assertThat(actualDeleteFiles).hasSize(1);
TestHelpers.assertEquals(
filesTableSchema, expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
@@ -513,43 +502,43 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
List<GenericData.Record> expectedFiles =
ListUtils.union(expectedDataFiles, expectedDeleteFiles);
expectedFiles.sort(Comparator.comparing(r -> ((Integer)
r.get("content"))));
- Assert.assertEquals("Metadata table should return 3 files", 3,
actualFiles.size());
+ assertThat(actualFiles).hasSize(3);
TestHelpers.assertEquals(filesTableSchema, expectedFiles, actualFiles);
}
- @Test
+ @TestTemplate
public void testAllFilesPartitioned() throws Exception {
- Assume.assumeFalse(!isPartition);
+ assumeThat(!isPartition).isFalse();
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
// Create delete file
Schema deleteRowSchema = table.schema().select("id");
Record dataDelete = GenericRecord.create(deleteRowSchema);
- TEMP.create();
Map<String, Object> deleteRow = Maps.newHashMap();
deleteRow.put("id", 1);
+ File testFile = File.createTempFile("junit", null, temp.toFile());
DeleteFile eqDeletes =
FileHelpers.writeDeleteFile(
table,
- Files.localOutput(TEMP.newFile()),
+ Files.localOutput(testFile),
org.apache.iceberg.TestHelpers.Row.of("a"),
Lists.newArrayList(dataDelete.copy(deleteRow)),
deleteRowSchema);
+ File testFile2 = File.createTempFile("junit", null, temp.toFile());
DeleteFile eqDeletes2 =
FileHelpers.writeDeleteFile(
table,
- Files.localOutput(TEMP.newFile()),
+ Files.localOutput(testFile2),
org.apache.iceberg.TestHelpers.Row.of("b"),
Lists.newArrayList(dataDelete.copy(deleteRow)),
deleteRowSchema);
table.newRowDelta().addDeletes(eqDeletes).addDeletes(eqDeletes2).commit();
List<ManifestFile> expectedDataManifests = dataManifests(table);
- Assert.assertEquals("Should have 2 data manifests", 2,
expectedDataManifests.size());
+ assertThat(expectedDataManifests).hasSize(2);
List<ManifestFile> expectedDeleteManifests = deleteManifests(table);
- Assert.assertEquals("Should have 1 delete manifest", 1,
expectedDeleteManifests.size());
-
+ assertThat(expectedDeleteManifests).hasSize(1);
// Clear table to test whether 'all_files' can read past files
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
@@ -575,8 +564,8 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
sql("SELECT %s FROM %s$all_data_files WHERE `partition`.`data`='a'",
names, TABLE_NAME);
List<GenericData.Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, "a");
- Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDataFiles.size());
- Assert.assertEquals("Metadata table should return one data file", 1,
actualDataFiles.size());
+ assertThat(expectedDataFiles).hasSize(1);
+ assertThat(actualDataFiles).hasSize(1);
TestHelpers.assertEquals(filesTableSchema, expectedDataFiles.get(0),
actualDataFiles.get(0));
// Check all delete files table
@@ -585,9 +574,8 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
List<GenericData.Record> expectedDeleteFiles =
expectedEntries(
table, FileContent.EQUALITY_DELETES, entriesTableSchema,
expectedDeleteManifests, "a");
- Assert.assertEquals("Should be one data file manifest entry", 1,
expectedDeleteFiles.size());
- Assert.assertEquals("Metadata table should return one data file", 1,
actualDeleteFiles.size());
-
+ assertThat(expectedDeleteFiles).hasSize(1);
+ assertThat(actualDeleteFiles).hasSize(1);
TestHelpers.assertEquals(
filesTableSchema, expectedDeleteFiles.get(0),
actualDeleteFiles.get(0));
@@ -599,11 +587,11 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
List<GenericData.Record> expectedFiles =
ListUtils.union(expectedDataFiles, expectedDeleteFiles);
expectedFiles.sort(Comparator.comparing(r -> ((Integer)
r.get("content"))));
- Assert.assertEquals("Metadata table should return two files", 2,
actualFiles.size());
+ assertThat(actualFiles).hasSize(2);
TestHelpers.assertEquals(filesTableSchema, expectedFiles, actualFiles);
}
- @Test
+ @TestTemplate
public void testMetadataLogEntries() {
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
@@ -617,55 +605,51 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
// Check metadataLog table
List<Row> metadataLogs = sql("SELECT * FROM %s$metadata_log_entries",
TABLE_NAME);
- Assert.assertEquals("metadataLogEntries table should return 3 row", 3,
metadataLogs.size());
+ assertThat(metadataLogs).hasSize(3);
Row metadataLog = metadataLogs.get(0);
- Assert.assertEquals(
- Instant.ofEpochMilli(metadataLogEntries.get(0).timestampMillis()),
- metadataLog.getField("timestamp"));
- Assert.assertEquals(metadataLogEntries.get(0).file(),
metadataLog.getField("file"));
- Assert.assertNull(metadataLog.getField("latest_snapshot_id"));
- Assert.assertNull(metadataLog.getField("latest_schema_id"));
- Assert.assertNull(metadataLog.getField("latest_sequence_number"));
+ assertThat(metadataLog.getField("timestamp"))
+
.isEqualTo(Instant.ofEpochMilli(metadataLogEntries.get(0).timestampMillis()));
+
assertThat(metadataLog.getField("file")).isEqualTo(metadataLogEntries.get(0).file());
+ assertThat(metadataLog.getField("latest_snapshot_id")).isNull();
+ assertThat(metadataLog.getField("latest_schema_id")).isNull();
+ assertThat(metadataLog.getField("latest_sequence_number")).isNull();
metadataLog = metadataLogs.get(1);
- Assert.assertEquals(
- Instant.ofEpochMilli(metadataLogEntries.get(1).timestampMillis()),
- metadataLog.getField("timestamp"));
- Assert.assertEquals(metadataLogEntries.get(1).file(),
metadataLog.getField("file"));
- Assert.assertEquals(parentSnapshot.snapshotId(),
metadataLog.getField("latest_snapshot_id"));
- Assert.assertEquals(parentSnapshot.schemaId(),
metadataLog.getField("latest_schema_id"));
- Assert.assertEquals(
- parentSnapshot.sequenceNumber(),
metadataLog.getField("latest_sequence_number"));
+ assertThat(metadataLog.getField("timestamp"))
+
.isEqualTo(Instant.ofEpochMilli(metadataLogEntries.get(1).timestampMillis()));
+
assertThat(metadataLog.getField("file")).isEqualTo(metadataLogEntries.get(1).file());
+
assertThat(metadataLog.getField("latest_snapshot_id")).isEqualTo(parentSnapshot.snapshotId());
+
assertThat(metadataLog.getField("latest_schema_id")).isEqualTo(parentSnapshot.schemaId());
+ assertThat(metadataLog.getField("latest_sequence_number"))
+ .isEqualTo(parentSnapshot.sequenceNumber());
+
assertThat(metadataLog.getField("latest_snapshot_id")).isEqualTo(parentSnapshot.snapshotId());
metadataLog = metadataLogs.get(2);
- Assert.assertEquals(
- Instant.ofEpochMilli(currentSnapshot.timestampMillis()),
metadataLog.getField("timestamp"));
- Assert.assertEquals(tableMetadata.metadataFileLocation(),
metadataLog.getField("file"));
- Assert.assertEquals(currentSnapshot.snapshotId(),
metadataLog.getField("latest_snapshot_id"));
- Assert.assertEquals(currentSnapshot.schemaId(),
metadataLog.getField("latest_schema_id"));
- Assert.assertEquals(
- currentSnapshot.sequenceNumber(),
metadataLog.getField("latest_sequence_number"));
+ assertThat(metadataLog.getField("timestamp"))
+ .isEqualTo(Instant.ofEpochMilli(currentSnapshot.timestampMillis()));
+
assertThat(metadataLog.getField("file")).isEqualTo(tableMetadata.metadataFileLocation());
+
assertThat(metadataLog.getField("latest_snapshot_id")).isEqualTo(currentSnapshot.snapshotId());
+
assertThat(metadataLog.getField("latest_schema_id")).isEqualTo(currentSnapshot.schemaId());
+ assertThat(metadataLog.getField("latest_sequence_number"))
+ .isEqualTo(currentSnapshot.sequenceNumber());
// test filtering
List<Row> metadataLogWithFilters =
sql(
"SELECT * FROM %s$metadata_log_entries WHERE latest_snapshot_id =
%s",
TABLE_NAME, currentSnapshotId);
- Assert.assertEquals(
- "metadataLogEntries table should return 1 row", 1,
metadataLogWithFilters.size());
-
+ assertThat(metadataLogWithFilters).hasSize(1);
metadataLog = metadataLogWithFilters.get(0);
- Assert.assertEquals(
-
Instant.ofEpochMilli(tableMetadata.currentSnapshot().timestampMillis()),
- metadataLog.getField("timestamp"));
- Assert.assertEquals(tableMetadata.metadataFileLocation(),
metadataLog.getField("file"));
- Assert.assertEquals(
- tableMetadata.currentSnapshot().snapshotId(),
metadataLog.getField("latest_snapshot_id"));
- Assert.assertEquals(
- tableMetadata.currentSnapshot().schemaId(),
metadataLog.getField("latest_schema_id"));
- Assert.assertEquals(
- tableMetadata.currentSnapshot().sequenceNumber(),
- metadataLog.getField("latest_sequence_number"));
+
assertThat(Instant.ofEpochMilli(tableMetadata.currentSnapshot().timestampMillis()))
+ .isEqualTo(metadataLog.getField("timestamp"));
+
+
assertThat(metadataLog.getField("file")).isEqualTo(tableMetadata.metadataFileLocation());
+ assertThat(metadataLog.getField("latest_snapshot_id"))
+ .isEqualTo(tableMetadata.currentSnapshot().snapshotId());
+ assertThat(metadataLog.getField("latest_schema_id"))
+ .isEqualTo(tableMetadata.currentSnapshot().schemaId());
+ assertThat(metadataLog.getField("latest_sequence_number"))
+ .isEqualTo(tableMetadata.currentSnapshot().sequenceNumber());
// test projection
List<String> metadataFiles =
@@ -675,14 +659,13 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
metadataFiles.add(tableMetadata.metadataFileLocation());
List<Row> metadataLogWithProjection =
sql("SELECT file FROM %s$metadata_log_entries", TABLE_NAME);
- Assert.assertEquals(
- "metadataLogEntries table should return 3 rows", 3,
metadataLogWithProjection.size());
+ assertThat(metadataLogWithProjection).hasSize(3);
for (int i = 0; i < metadataFiles.size(); i++) {
- Assert.assertEquals(metadataFiles.get(i),
metadataLogWithProjection.get(i).getField("file"));
+
assertThat(metadataLogWithProjection.get(i).getField("file")).isEqualTo(metadataFiles.get(i));
}
}
- @Test
+ @TestTemplate
public void testSnapshotReferencesMetatable() {
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
@@ -704,62 +687,63 @@ public class TestFlinkMetaDataTable extends
FlinkCatalogTestBase {
.commit();
// Check refs table
List<Row> references = sql("SELECT * FROM %s$refs", TABLE_NAME);
- Assert.assertEquals("Refs table should return 3 rows", 3,
references.size());
List<Row> branches = sql("SELECT * FROM %s$refs WHERE type='BRANCH'",
TABLE_NAME);
- Assert.assertEquals("Refs table should return 2 branches", 2,
branches.size());
+ assertThat(references).hasSize(3);
+ assertThat(branches).hasSize(2);
List<Row> tags = sql("SELECT * FROM %s$refs WHERE type='TAG'", TABLE_NAME);
- Assert.assertEquals("Refs table should return 1 tag", 1, tags.size());
-
+ assertThat(tags).hasSize(1);
// Check branch entries in refs table
List<Row> mainBranch =
sql("SELECT * FROM %s$refs WHERE name='main' AND type='BRANCH'",
TABLE_NAME);
- Assert.assertEquals("main", mainBranch.get(0).getFieldAs("name"));
- Assert.assertEquals("BRANCH", mainBranch.get(0).getFieldAs("type"));
- Assert.assertEquals(currentSnapshotId,
mainBranch.get(0).getFieldAs("snapshot_id"));
-
+ assertThat((String)
mainBranch.get(0).getFieldAs("name")).isEqualTo("main");
+ assertThat((String)
mainBranch.get(0).getFieldAs("type")).isEqualTo("BRANCH");
+ assertThat((Long)
mainBranch.get(0).getFieldAs("snapshot_id")).isEqualTo(currentSnapshotId);
List<Row> testBranch =
sql("SELECT * FROM %s$refs WHERE name='testBranch' AND
type='BRANCH'", TABLE_NAME);
- Assert.assertEquals("testBranch", testBranch.get(0).getFieldAs("name"));
- Assert.assertEquals("BRANCH", testBranch.get(0).getFieldAs("type"));
- Assert.assertEquals(currentSnapshotId,
testBranch.get(0).getFieldAs("snapshot_id"));
- Assert.assertEquals(Long.valueOf(10),
testBranch.get(0).getFieldAs("max_reference_age_in_ms"));
- Assert.assertEquals(Integer.valueOf(20),
testBranch.get(0).getFieldAs("min_snapshots_to_keep"));
- Assert.assertEquals(Long.valueOf(30),
testBranch.get(0).getFieldAs("max_snapshot_age_in_ms"));
+ assertThat((String)
testBranch.get(0).getFieldAs("name")).isEqualTo("testBranch");
+ assertThat((String)
testBranch.get(0).getFieldAs("type")).isEqualTo("BRANCH");
+ assertThat((Long)
testBranch.get(0).getFieldAs("snapshot_id")).isEqualTo(currentSnapshotId);
+ assertThat((Long) testBranch.get(0).getFieldAs("max_reference_age_in_ms"))
+ .isEqualTo(Long.valueOf(10));
+ assertThat((Integer) testBranch.get(0).getFieldAs("min_snapshots_to_keep"))
+ .isEqualTo(Integer.valueOf(20));
+ assertThat((Long) testBranch.get(0).getFieldAs("max_snapshot_age_in_ms"))
+ .isEqualTo(Long.valueOf(30));
// Check tag entries in refs table
List<Row> testTag =
sql("SELECT * FROM %s$refs WHERE name='testTag' AND type='TAG'",
TABLE_NAME);
- Assert.assertEquals("testTag", testTag.get(0).getFieldAs("name"));
- Assert.assertEquals("TAG", testTag.get(0).getFieldAs("type"));
- Assert.assertEquals(currentSnapshotId,
testTag.get(0).getFieldAs("snapshot_id"));
- Assert.assertEquals(Long.valueOf(50),
testTag.get(0).getFieldAs("max_reference_age_in_ms"));
-
+ assertThat((String)
testTag.get(0).getFieldAs("name")).isEqualTo("testTag");
+ assertThat((String) testTag.get(0).getFieldAs("type")).isEqualTo("TAG");
+ assertThat((Long)
testTag.get(0).getFieldAs("snapshot_id")).isEqualTo(currentSnapshotId);
+ assertThat((Long) testTag.get(0).getFieldAs("max_reference_age_in_ms"))
+ .isEqualTo(Long.valueOf(50));
// Check projection in refs table
List<Row> testTagProjection =
sql(
"SELECT
name,type,snapshot_id,max_reference_age_in_ms,min_snapshots_to_keep FROM
%s$refs where type='TAG'",
TABLE_NAME);
- Assert.assertEquals("testTag",
testTagProjection.get(0).getFieldAs("name"));
- Assert.assertEquals("TAG", testTagProjection.get(0).getFieldAs("type"));
- Assert.assertEquals(currentSnapshotId,
testTagProjection.get(0).getFieldAs("snapshot_id"));
- Assert.assertEquals(
- Long.valueOf(50),
testTagProjection.get(0).getFieldAs("max_reference_age_in_ms"));
-
Assert.assertNull(testTagProjection.get(0).getFieldAs("min_snapshots_to_keep"));
-
+ assertThat((String)
testTagProjection.get(0).getFieldAs("name")).isEqualTo("testTag");
+ assertThat((String)
testTagProjection.get(0).getFieldAs("type")).isEqualTo("TAG");
+ assertThat((Long) testTagProjection.get(0).getFieldAs("snapshot_id"))
+ .isEqualTo(currentSnapshotId);
+ assertThat((Long)
testTagProjection.get(0).getFieldAs("max_reference_age_in_ms"))
+ .isEqualTo(Long.valueOf(50));
+ assertThat((String)
testTagProjection.get(0).getFieldAs("min_snapshots_to_keep")).isNull();
List<Row> mainBranchProjection =
sql("SELECT name, type FROM %s$refs WHERE name='main' AND type =
'BRANCH'", TABLE_NAME);
- Assert.assertEquals("main",
mainBranchProjection.get(0).getFieldAs("name"));
- Assert.assertEquals("BRANCH",
mainBranchProjection.get(0).getFieldAs("type"));
-
+ assertThat((String)
mainBranchProjection.get(0).getFieldAs("name")).isEqualTo("main");
+ assertThat((String)
mainBranchProjection.get(0).getFieldAs("type")).isEqualTo("BRANCH");
List<Row> testBranchProjection =
sql(
"SELECT type, name, max_reference_age_in_ms, snapshot_id FROM
%s$refs WHERE name='testBranch' AND type = 'BRANCH'",
TABLE_NAME);
- Assert.assertEquals("testBranch",
testBranchProjection.get(0).getFieldAs("name"));
- Assert.assertEquals("BRANCH",
testBranchProjection.get(0).getFieldAs("type"));
- Assert.assertEquals(currentSnapshotId,
testBranchProjection.get(0).getFieldAs("snapshot_id"));
- Assert.assertEquals(
- Long.valueOf(10),
testBranchProjection.get(0).getFieldAs("max_reference_age_in_ms"));
+ assertThat((String)
testBranchProjection.get(0).getFieldAs("name")).isEqualTo("testBranch");
+ assertThat((String)
testBranchProjection.get(0).getFieldAs("type")).isEqualTo("BRANCH");
+ assertThat((Long) testBranchProjection.get(0).getFieldAs("snapshot_id"))
+ .isEqualTo(currentSnapshotId);
+ assertThat((Long)
testBranchProjection.get(0).getFieldAs("max_reference_age_in_ms"))
+ .isEqualTo(Long.valueOf(10));
}
/**
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 633e11718b..09d5a5481a 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.source;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -33,31 +35,25 @@ import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
-import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.CatalogTestBase;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
-public class TestStreamScanSql extends FlinkCatalogTestBase {
+public class TestStreamScanSql extends CatalogTestBase {
private static final String TABLE = "test_table";
private static final FileFormat FORMAT = FileFormat.PARQUET;
private TableEnvironment tEnv;
- public TestStreamScanSql(String catalogName, Namespace baseNamespace) {
- super(catalogName, baseNamespace);
- }
-
@Override
protected TableEnvironment getTableEnv() {
if (tEnv == null) {
@@ -85,7 +81,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
}
@Override
- @Before
+ @BeforeEach
public void before() {
super.before();
sql("CREATE DATABASE %s", flinkDatabase);
@@ -94,7 +90,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
}
@Override
- @After
+ @AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
@@ -102,7 +98,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
}
private void insertRows(String partition, Table table, Row... rows) throws
IOException {
- GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT,
TEMPORARY_FOLDER);
+ GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT,
temporaryDirectory);
GenericRecord gRecord = GenericRecord.create(table.schema());
List<Record> records = Lists.newArrayList();
@@ -127,20 +123,16 @@ public class TestStreamScanSql extends
FlinkCatalogTestBase {
private void assertRows(List<Row> expectedRows, Iterator<Row> iterator) {
for (Row expectedRow : expectedRows) {
- Assert.assertTrue("Should have more records", iterator.hasNext());
-
+ assertThat(iterator).hasNext();
Row actualRow = iterator.next();
- Assert.assertEquals("Should have expected fields", 3,
actualRow.getArity());
- Assert.assertEquals(
- "Should have expected id", expectedRow.getField(0),
actualRow.getField(0));
- Assert.assertEquals(
- "Should have expected data", expectedRow.getField(1),
actualRow.getField(1));
- Assert.assertEquals(
- "Should have expected dt", expectedRow.getField(2),
actualRow.getField(2));
+ assertThat(actualRow.getArity()).isEqualTo(3);
+ assertThat(actualRow.getField(0)).isEqualTo(expectedRow.getField(0));
+ assertThat(actualRow.getField(1)).isEqualTo(expectedRow.getField(1));
+ assertThat(actualRow.getField(2)).isEqualTo(expectedRow.getField(2));
}
}
- @Test
+ @TestTemplate
public void testUnPartitionedTable() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
@@ -160,7 +152,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase
{
result.getJobClient().ifPresent(JobClient::cancel);
}
- @Test
+ @TestTemplate
public void testPartitionedTable() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR) PARTITIONED BY
(dt)", TABLE);
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
@@ -187,7 +179,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase
{
result.getJobClient().ifPresent(JobClient::cancel);
}
- @Test
+ @TestTemplate
public void testConsumeFromBeginning() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
@@ -212,7 +204,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase
{
result.getJobClient().ifPresent(JobClient::cancel);
}
- @Test
+ @TestTemplate
public void testConsumeFilesWithBranch() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
@@ -229,7 +221,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase
{
.hasMessage("Cannot scan table using ref b1 configured for streaming
reader yet");
}
- @Test
+ @TestTemplate
public void testConsumeFromStartSnapshotId() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
@@ -267,7 +259,7 @@ public class TestStreamScanSql extends FlinkCatalogTestBase
{
result.getJobClient().ifPresent(JobClient::cancel);
}
- @Test
+ @TestTemplate
public void testConsumeFromStartTag() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));