This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d4d747d55f Flink 1.16: Remove usage of AssertHelpers (#8946)
d4d747d55f is described below
commit d4d747d55f767410b71a1baf6dba742ecc194c07
Author: Ashok <[email protected]>
AuthorDate: Tue Oct 31 20:32:13 2023 +0530
Flink 1.16: Remove usage of AssertHelpers (#8946)
---
.../iceberg/flink/TestFlinkCatalogFactory.java | 22 ++++-----
.../iceberg/flink/TestFlinkCatalogTable.java | 28 +++++------
.../flink/TestFlinkCatalogTablePartitions.java | 9 ++--
.../apache/iceberg/flink/TestFlinkSchemaUtil.java | 11 ++---
.../apache/iceberg/flink/TestIcebergConnector.java | 32 +++++++------
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 41 ++++------------
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 29 ++++++------
.../flink/sink/TestFlinkIcebergSinkV2Base.java | 54 ++++++++++------------
.../apache/iceberg/flink/source/TestFlinkScan.java | 44 +++++++++---------
.../flink/source/TestFlinkSourceConfig.java | 14 ++----
.../iceberg/flink/source/TestFlinkTableSource.java | 10 ++--
.../iceberg/flink/source/TestStreamScanSql.java | 36 +++++++--------
.../flink/source/TestStreamingMonitorFunction.java | 25 +++-------
.../enumerator/TestContinuousSplitPlannerImpl.java | 38 ++++++---------
...estContinuousSplitPlannerImplStartStrategy.java | 38 ++++++++-------
15 files changed, 183 insertions(+), 248 deletions(-)
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
index 6373d3a67b..c3f8bf92e4 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
@@ -86,24 +85,21 @@ public class TestFlinkCatalogFactory {
props.put(CatalogProperties.CATALOG_IMPL,
CustomHadoopCatalog.class.getName());
props.put(
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE,
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);
-
- AssertHelpers.assertThrows(
- "Should throw when both catalog-type and catalog-impl are set",
- IllegalArgumentException.class,
- "both catalog-type and catalog-impl are set",
- () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, new
Configuration()));
+ Assertions.assertThatThrownBy(
+ () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props,
new Configuration()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith(
+ "Cannot create catalog customCatalog, both catalog-type and
catalog-impl are set");
}
@Test
public void testLoadCatalogUnknown() {
String catalogName = "unknownCatalog";
props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "fooType");
-
- AssertHelpers.assertThrows(
- "Should throw when an unregistered / unknown catalog is set as the
catalog factor's`type` setting",
- UnsupportedOperationException.class,
- "Unknown catalog-type",
- () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, new
Configuration()));
+ Assertions.assertThatThrownBy(
+ () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props,
new Configuration()))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageStartingWith("Unknown catalog-type: fooType");
}
public static class CustomHadoopCatalog extends HadoopCatalog {
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 7f47b70286..472cf70b44 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -34,7 +34,6 @@ import
org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
@@ -55,6 +54,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -105,11 +105,9 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()));
validationCatalog.createTable(TableIdentifier.of(icebergNamespace, "tl"),
tableSchema);
sql("ALTER TABLE tl RENAME TO tl2");
- AssertHelpers.assertThrows(
- "Should fail if trying to get a nonexistent table",
- ValidationException.class,
- "Table `tl` was not found.",
- () -> getTableEnv().from("tl"));
+ Assertions.assertThatThrownBy(() -> getTableEnv().from("tl"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Table `tl` was not found.");
Schema actualSchema =
FlinkSchemaUtil.convert(getTableEnv().from("tl2").getSchema());
Assert.assertEquals(tableSchema.asStruct(), actualSchema.asStruct());
}
@@ -178,11 +176,9 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
assertThat(table("tl")).isNotNull();
sql("DROP TABLE tl");
- AssertHelpers.assertThrows(
- "Table 'tl' should be dropped",
- NoSuchTableException.class,
- "Table does not exist: " + getFullQualifiedTableName("tl"),
- () -> table("tl"));
+ Assertions.assertThatThrownBy(() -> table("tl"))
+ .isInstanceOf(NoSuchTableException.class)
+ .hasMessage("Table does not exist: " +
getFullQualifiedTableName("tl"));
sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
assertThat(table("tl").properties()).doesNotContainKey("key");
@@ -277,12 +273,10 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
Table table = table("tl");
TableOperations ops = ((BaseTable) table).operations();
Assert.assertEquals("should create table using format v2", 2,
ops.refresh().formatVersion());
-
- AssertHelpers.assertThrowsRootCause(
- "should fail to downgrade to v1",
- IllegalArgumentException.class,
- "Cannot downgrade v2 table to v1",
- () -> sql("ALTER TABLE tl SET('format-version'='1')"));
+ Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl
SET('format-version'='1')"))
+ .rootCause()
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot downgrade v2 table to v1");
}
@Test
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index e5332e8f30..fad65f4c63 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -23,12 +23,12 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -90,10 +90,9 @@ public class TestFlinkCatalogTablePartitions extends
FlinkCatalogTestBase {
ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
FlinkCatalog flinkCatalog = (FlinkCatalog)
getTableEnv().getCatalog(catalogName).get();
- AssertHelpers.assertThrows(
- "Should not list partitions for unpartitioned table.",
- TableNotPartitionedException.class,
- () -> flinkCatalog.listPartitions(objectPath));
+ Assertions.assertThatThrownBy(() ->
flinkCatalog.listPartitions(objectPath))
+ .isInstanceOf(TableNotPartitionedException.class)
+ .hasMessage("Table " + objectPath + " in catalog " + catalogName + "
is not partitioned.");
}
@Test
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
index b5dfb9cb2f..16b220ba67 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
@@ -30,13 +30,13 @@ import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
@@ -407,10 +407,9 @@ public class TestFlinkSchemaUtil {
Types.StructType.of(
Types.NestedField.required(2, "inner",
Types.IntegerType.get())))),
Sets.newHashSet(2));
- AssertHelpers.assertThrows(
- "Does not support the nested columns in flink schema's primary keys",
- ValidationException.class,
- "Column 'struct.inner' does not exist",
- () -> FlinkSchemaUtil.toSchema(icebergSchema));
+ Assertions.assertThatThrownBy(() ->
FlinkSchemaUtil.toSchema(icebergSchema))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageStartingWith("Could not create a PRIMARY KEY")
+ .hasMessageContaining("Column 'struct.inner' does not exist");
}
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index a12fe2507f..4f71b5fe8d 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -31,13 +31,13 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.thrift.TException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -261,11 +261,10 @@ public class TestIcebergConnector extends FlinkTestBase {
try {
testCreateConnectorTable();
// Ensure that the table was created under the specific database.
- AssertHelpers.assertThrows(
- "Table should already exists",
- org.apache.flink.table.api.TableException.class,
- "Could not execute CreateTable in path",
- () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`",
databaseName(), TABLE_NAME));
+ Assertions.assertThatThrownBy(
+ () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`",
databaseName(), TABLE_NAME))
+ .isInstanceOf(org.apache.flink.table.api.TableException.class)
+ .hasMessageStartingWith("Could not execute CreateTable in path");
} finally {
sql("DROP TABLE IF EXISTS `%s`.`%s`", databaseName(), TABLE_NAME);
if (!isDefaultDatabaseName()) {
@@ -293,14 +292,19 @@ public class TestIcebergConnector extends FlinkTestBase {
// Create a connector table in an iceberg catalog.
sql("CREATE CATALOG `test_catalog` WITH %s", toWithClause(catalogProps));
try {
- AssertHelpers.assertThrowsCause(
- "Cannot create the iceberg connector table in iceberg catalog",
- IllegalArgumentException.class,
- "Cannot create the table with 'connector'='iceberg' table property
in an iceberg catalog",
- () ->
- sql(
- "CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data
STRING) WITH %s",
- FlinkCatalogFactory.DEFAULT_DATABASE_NAME, TABLE_NAME,
toWithClause(tableProps)));
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data
STRING) WITH %s",
+ FlinkCatalogFactory.DEFAULT_DATABASE_NAME,
+ TABLE_NAME,
+ toWithClause(tableProps)))
+ .cause()
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot create the table with 'connector'='iceberg' table
property in an iceberg catalog, "
+ + "Please create table with 'connector'='iceberg' property
in a non-iceberg catalog or "
+ + "create table without 'connector'='iceberg' related
properties in an iceberg table.");
} finally {
sql("DROP CATALOG IF EXISTS `test_catalog`");
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index d771fe140b..11a73d2cc1 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -46,6 +45,7 @@ import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -198,14 +198,9 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
.set(TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName())
.commit();
- AssertHelpers.assertThrows(
- "Does not support range distribution-mode now.",
- IllegalArgumentException.class,
- "Flink does not support 'range' write distribution mode now.",
- () -> {
- testWriteRow(null, DistributionMode.RANGE);
- return null;
- });
+ Assertions.assertThatThrownBy(() -> testWriteRow(null,
DistributionMode.RANGE))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Flink does not support 'range' write distribution mode
now.");
}
@Test
@@ -351,17 +346,9 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
.writeParallelism(parallelism)
.setAll(newProps);
- AssertHelpers.assertThrows(
- "Should fail with invalid distribution mode.",
- IllegalArgumentException.class,
- "Invalid distribution mode: UNRECOGNIZED",
- () -> {
- builder.append();
-
- // Execute the program.
- env.execute("Test Iceberg DataStream.");
- return null;
- });
+ Assertions.assertThatThrownBy(builder::append)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid distribution mode: UNRECOGNIZED");
}
@Test
@@ -379,17 +366,9 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
.writeParallelism(parallelism)
.setAll(newProps);
- AssertHelpers.assertThrows(
- "Should fail with invalid file format.",
- IllegalArgumentException.class,
- "Invalid file format: UNRECOGNIZED",
- () -> {
- builder.append();
-
- // Execute the program.
- env.execute("Test Iceberg DataStream.");
- return null;
- });
+ Assertions.assertThatThrownBy(builder::append)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid file format: UNRECOGNIZED");
}
@Test
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 6552fe834c..b5c3bcf417 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
@@ -39,6 +38,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -202,18 +202,21 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
.writeParallelism(parallelism)
.upsert(true);
- AssertHelpers.assertThrows(
- "Should be error because upsert mode and overwrite mode enable at the
same time.",
- IllegalStateException.class,
- "OVERWRITE mode shouldn't be enable",
- () ->
- builder.equalityFieldColumns(ImmutableList.of("id",
"data")).overwrite(true).append());
-
- AssertHelpers.assertThrows(
- "Should be error because equality field columns are empty.",
- IllegalStateException.class,
- "Equality field columns shouldn't be empty",
- () ->
builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append());
+ Assertions.assertThatThrownBy(
+ () ->
+ builder
+ .equalityFieldColumns(ImmutableList.of("id", "data"))
+ .overwrite(true)
+ .append())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "OVERWRITE mode shouldn't be enable when configuring to use UPSERT
data stream.");
+
+ Assertions.assertThatThrownBy(
+ () ->
builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Equality field columns shouldn't be empty when configuring to use
UPSERT data stream.");
}
@Test
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
index 15380408e4..507df9e352 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -43,6 +42,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.StructLikeSet;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
public class TestFlinkIcebergSinkV2Base {
@@ -231,20 +231,19 @@ public class TestFlinkIcebergSinkV2Base {
ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
if (partitioned &&
writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
- AssertHelpers.assertThrows(
- "Should be error because equality field columns don't include all
partition keys",
- IllegalStateException.class,
- "should be included in equality fields",
- () -> {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords,
- branch);
- return null;
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ testChangeLogs(
+ ImmutableList.of("id"),
+ row -> row.getField(ROW_ID_POS),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageStartingWith(
+ "In 'hash' distribution mode with equality fields set, partition
field")
+ .hasMessageContaining("should be included in equality fields:");
} else {
testChangeLogs(
ImmutableList.of("id"),
@@ -278,20 +277,17 @@ public class TestFlinkIcebergSinkV2Base {
expectedRecords,
branch);
} else {
- AssertHelpers.assertThrows(
- "Should be error because equality field columns don't include all
partition keys",
- IllegalStateException.class,
- "should be included in equality fields",
- () -> {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- true,
- elementsPerCheckpoint,
- expectedRecords,
- branch);
- return null;
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ testChangeLogs(
+ ImmutableList.of("id"),
+ row -> row.getField(ROW_ID_POS),
+ true,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("should be included in equality fields");
}
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index aa5b51eddf..b537efa727 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -28,7 +28,6 @@ import java.util.Map;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -49,6 +48,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -355,27 +355,27 @@ public abstract class TestFlinkScan {
expected,
TestFixtures.SCHEMA);
- AssertHelpers.assertThrows(
- "START_SNAPSHOT_ID and START_TAG cannot both be set.",
- Exception.class,
- () ->
- runWithOptions(
- ImmutableMap.<String, String>builder()
- .put("start-tag", startTag)
- .put("end-tag", endTag)
- .put("start-snapshot-id", Long.toString(snapshotId1))
- .buildOrThrow()));
-
- AssertHelpers.assertThrows(
- "END_SNAPSHOT_ID and END_TAG cannot both be set.",
- Exception.class,
- () ->
- runWithOptions(
- ImmutableMap.<String, String>builder()
- .put("start-tag", startTag)
- .put("end-tag", endTag)
- .put("end-snapshot-id", Long.toString(snapshotId3))
- .buildOrThrow()));
+ Assertions.assertThatThrownBy(
+ () ->
+ runWithOptions(
+ ImmutableMap.<String, String>builder()
+ .put("start-tag", startTag)
+ .put("end-tag", endTag)
+ .put("start-snapshot-id", Long.toString(snapshotId1))
+ .buildOrThrow()))
+ .isInstanceOf(Exception.class)
+ .hasMessage("START_SNAPSHOT_ID and START_TAG cannot both be set.");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ runWithOptions(
+ ImmutableMap.<String, String>builder()
+ .put("start-tag", startTag)
+ .put("end-tag", endTag)
+ .put("end-snapshot-id", Long.toString(snapshotId3))
+ .buildOrThrow()))
+ .isInstanceOf(Exception.class)
+ .hasMessage("END_SNAPSHOT_ID and END_TAG cannot both be set.");
}
@Test
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
index 974b8539b3..1814ff8f85 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
@@ -20,8 +20,8 @@ package org.apache.iceberg.flink.source;
import java.util.List;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.flink.FlinkReadOptions;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
@@ -31,14 +31,10 @@ public class TestFlinkSourceConfig extends
TestFlinkTableSource {
@Test
public void testFlinkSessionConfig() {
getTableEnv().getConfig().set(FlinkReadOptions.STREAMING_OPTION, true);
- AssertHelpers.assertThrows(
- "Should throw exception because of cannot set snapshot-id option for
streaming reader",
- IllegalArgumentException.class,
- "Cannot set as-of-timestamp option for streaming reader",
- () -> {
- sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE);
- return null;
- });
+ Assertions.assertThatThrownBy(
+ () -> sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/",
TABLE))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot set as-of-timestamp option for streaming reader");
}
@Test
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
index f240e564a7..b56e804d14 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.events.ScanEvent;
@@ -102,11 +101,10 @@ public class TestFlinkTableSource extends FlinkTestBase {
@Test
public void testLimitPushDown() {
-
- AssertHelpers.assertThrows(
- "Invalid limit number: -1 ",
- SqlParserException.class,
- () -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME));
+ Assertions.assertThatThrownBy(() -> sql("SELECT * FROM %s LIMIT -1",
TABLE_NAME))
+ .as("Invalid limit number: -1 ")
+ .isInstanceOf(SqlParserException.class)
+ .hasMessageContaining("SQL parse failed. Encountered \"-\"");
Assert.assertEquals(
"Should have 0 record", 0, sql("SELECT * FROM %s LIMIT 0",
TABLE_NAME).size());
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index abcce11e36..eaba615ce9 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -30,7 +30,6 @@ import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
@@ -43,6 +42,7 @@ import org.apache.iceberg.flink.FlinkCatalogTestBase;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -219,15 +219,13 @@ public class TestStreamScanSql extends
FlinkCatalogTestBase {
Row row1 = Row.of(1, "aaa", "2021-01-01");
Row row2 = Row.of(2, "bbb", "2021-01-01");
insertRows(table, row1, row2);
-
- AssertHelpers.assertThrows(
- "Cannot scan table using ref for stream yet",
- IllegalArgumentException.class,
- "Cannot scan table using ref",
- () ->
- exec(
- "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'branch'='b1')*/",
- TABLE));
+ Assertions.assertThatThrownBy(
+ () ->
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'branch'='b1')*/",
+ TABLE))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot scan table using ref b1 configured for streaming
reader yet");
}
@Test
@@ -306,15 +304,13 @@ public class TestStreamScanSql extends
FlinkCatalogTestBase {
assertRows(ImmutableList.of(row7), iterator);
}
result.getJobClient().ifPresent(JobClient::cancel);
-
- AssertHelpers.assertThrows(
- "START_SNAPSHOT_ID and START_TAG cannot both be set.",
- IllegalArgumentException.class,
- "START_SNAPSHOT_ID and START_TAG cannot both be set.",
- () ->
- exec(
- "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'start-tag'='%s', "
- + "'start-snapshot-id'='%d' )*/",
- TABLE, tagName, startSnapshotId));
+ Assertions.assertThatThrownBy(
+ () ->
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true',
'monitor-interval'='1s', 'start-tag'='%s', "
+ + "'start-snapshot-id'='%d' )*/",
+ TABLE, tagName, startSnapshotId))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("START_SNAPSHOT_ID and START_TAG cannot both be set.");
}
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index 65e2b1864f..0c3f54cc72 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -48,6 +47,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
+import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
@@ -255,15 +255,9 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
.monitorInterval(Duration.ofMillis(100))
.maxPlanningSnapshotCount(0)
.build();
-
- AssertHelpers.assertThrows(
- "Should throw exception because of invalid config",
- IllegalArgumentException.class,
- "must be greater than zero",
- () -> {
- createFunction(scanContext1);
- return null;
- });
+ Assertions.assertThatThrownBy(() -> createFunction(scanContext1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("The max-planning-snapshot-count must be greater than
zero");
ScanContext scanContext2 =
ScanContext.builder()
@@ -271,14 +265,9 @@ public class TestStreamingMonitorFunction extends
TableTestBase {
.maxPlanningSnapshotCount(-10)
.build();
- AssertHelpers.assertThrows(
- "Should throw exception because of invalid config",
- IllegalArgumentException.class,
- "must be greater than zero",
- () -> {
- createFunction(scanContext2);
- return null;
- });
+ Assertions.assertThatThrownBy(() -> createFunction(scanContext2))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("The max-planning-snapshot-count must be greater than
zero");
}
@Test
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
index fef61bc048..85174b4ab2 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Snapshot;
@@ -37,6 +36,7 @@ import
org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -337,12 +337,9 @@ public class TestContinuousSplitPlannerImpl {
ContinuousSplitPlannerImpl splitPlanner =
new ContinuousSplitPlannerImpl(
tableResource.tableLoader().clone(),
scanContextWithInvalidSnapshotId, null);
-
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot id",
- IllegalArgumentException.class,
- "Start snapshot id not found in history: 1",
- () -> splitPlanner.planSplits(null));
+ Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Start snapshot id not found in history: 1");
}
@Test
@@ -365,12 +362,9 @@ public class TestContinuousSplitPlannerImpl {
ContinuousSplitPlannerImpl splitPlanner =
new ContinuousSplitPlannerImpl(
tableResource.tableLoader().clone(),
scanContextWithInvalidSnapshotId, null);
-
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot id",
- IllegalArgumentException.class,
- "Start snapshot id not found in history: " + invalidSnapshotId,
- () -> splitPlanner.planSplits(null));
+ Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Start snapshot id not found in history: " +
invalidSnapshotId);
}
@Test
@@ -429,12 +423,9 @@ public class TestContinuousSplitPlannerImpl {
ContinuousSplitPlannerImpl splitPlanner =
new ContinuousSplitPlannerImpl(
tableResource.tableLoader().clone(),
scanContextWithInvalidSnapshotId, null);
-
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot timestamp",
- IllegalArgumentException.class,
- "Cannot find a snapshot after: ",
- () -> splitPlanner.planSplits(null));
+ Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot find a snapshot after: 1");
}
@Test
@@ -452,12 +443,9 @@ public class TestContinuousSplitPlannerImpl {
ContinuousSplitPlannerImpl splitPlanner =
new ContinuousSplitPlannerImpl(
tableResource.tableLoader().clone(),
scanContextWithInvalidSnapshotId, null);
-
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot timestamp",
- IllegalArgumentException.class,
- "Cannot find a snapshot after: ",
- () -> splitPlanner.planSplits(null));
+ Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot find a snapshot after: ");
}
@Test
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
index 2df2846f7e..2c94f21590 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source.enumerator;
import java.io.IOException;
import java.util.List;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.data.GenericAppenderHelper;
@@ -30,6 +29,7 @@ import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -79,7 +79,7 @@ public class TestContinuousSplitPlannerImplStartStrategy {
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- // emtpy table
+ // empty table
Assert.assertFalse(
ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(),
scanContext).isPresent());
@@ -97,7 +97,7 @@ public class TestContinuousSplitPlannerImplStartStrategy {
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
.build();
- // emtpy table
+ // empty table
Assert.assertFalse(
ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(),
scanContext).isPresent());
@@ -115,7 +115,7 @@ public class TestContinuousSplitPlannerImplStartStrategy {
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
.build();
- // emtpy table
+ // empty table
Assert.assertFalse(
ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(),
scanContext).isPresent());
@@ -134,14 +134,13 @@ public class TestContinuousSplitPlannerImplStartStrategy {
.startSnapshotId(1L)
.build();
- // emtpy table
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot id",
- IllegalArgumentException.class,
- "Start snapshot id not found in history: 1",
- () ->
- ContinuousSplitPlannerImpl.startSnapshot(
- tableResource.table(), scanContextInvalidSnapshotId));
+ // empty table
+ Assertions.assertThatThrownBy(
+ () ->
+ ContinuousSplitPlannerImpl.startSnapshot(
+ tableResource.table(), scanContextInvalidSnapshotId))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Start snapshot id not found in history: 1");
appendThreeSnapshots();
@@ -166,14 +165,13 @@ public class TestContinuousSplitPlannerImplStartStrategy {
.startSnapshotTimestamp(1L)
.build();
- // emtpy table
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot timestamp",
- IllegalArgumentException.class,
- "Cannot find a snapshot after: ",
- () ->
- ContinuousSplitPlannerImpl.startSnapshot(
- tableResource.table(), scanContextInvalidSnapshotTimestamp));
+ // empty table
+ Assertions.assertThatThrownBy(
+ () ->
+ ContinuousSplitPlannerImpl.startSnapshot(
+ tableResource.table(),
scanContextInvalidSnapshotTimestamp))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot find a snapshot after: ");
appendThreeSnapshots();