This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b9a4478b0f Spark 3.5: Remove AssertHelpers usage (#8948)
b9a4478b0f is described below
commit b9a4478b0f8f5eeae553eb3900b08a7a08dbdb40
Author: Ashok <[email protected]>
AuthorDate: Tue Oct 31 15:33:31 2023 +0530
Spark 3.5: Remove AssertHelpers usage (#8948)
---
.../spark/extensions/TestChangelogTable.java | 11 +-
.../apache/iceberg/spark/extensions/TestMerge.java | 1036 +++++++++-----------
.../TestRequiredDistributionAndOrdering.java | 29 +-
.../iceberg/spark/extensions/TestTagDDL.java | 118 +--
.../iceberg/spark/extensions/TestUpdate.java | 182 ++--
5 files changed, 629 insertions(+), 747 deletions(-)
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
index cc81b4b3d3..fe44fa3dec 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.spark.extensions;
-import static org.apache.iceberg.AssertHelpers.assertThrows;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
@@ -35,6 +34,7 @@ import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.source.SparkChangelogTable;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Row;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -201,11 +201,10 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
table.refresh();
Snapshot snap3 = table.currentSnapshot();
long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
-
- assertThrows(
- "Should fail if start time is after end time",
- IllegalArgumentException.class,
- () -> changelogRecords(snap3.timestampMillis(),
snap2.timestampMillis()));
+ Assertions.assertThatThrownBy(
+ () -> changelogRecords(snap3.timestampMillis(),
snap2.timestampMillis()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot set start-timestamp to be greater than
end-timestamp for changelogs");
}
@Test
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index b8439a399f..38ad8b0da5 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PlanningMode;
@@ -831,7 +830,8 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
+ "{ \"id\": 1, \"state\": \"off\" }\n"
+ "{ \"id\": 10, \"state\": \"on\" }");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
Assertions.assertThatThrownBy(
() ->
sql(
@@ -865,23 +865,23 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
ds.union(ds).createOrReplaceTempView("source");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkRuntimeException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id == s.value "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET id = 10 "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE "
- + "WHEN NOT MATCHED AND s.value = 2 THEN "
- + " INSERT (id, dep) VALUES (s.value, null)",
- commitTarget());
- });
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET id = 10 "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED AND s.value = 2 THEN "
+ + " INSERT (id, dep) VALUES (s.value, null)",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkRuntimeException.class)
+ .hasMessageContaining(errorMsg);
assertEquals(
"Target should be unchanged",
@@ -907,23 +907,22 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"),
() -> {
String errorMsg =
- "a single row from the target table with multiple rows of the
source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkRuntimeException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id == s.value "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET id = 10 "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE "
- + "WHEN NOT MATCHED AND s.value = 2 THEN "
- + " INSERT (id, dep) VALUES (s.value, null)",
- commitTarget());
- });
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET id = 10 "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED AND s.value = 2 THEN "
+ + " INSERT (id, dep) VALUES (s.value, null)",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkRuntimeException.class)
+ .hasMessageContaining(errorMsg);
});
assertEquals(
@@ -947,23 +946,22 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"),
() -> {
String errorMsg =
- "a single row from the target table with multiple rows of the
source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkRuntimeException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id > s.value "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET id = 10 "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE "
- + "WHEN NOT MATCHED AND s.value = 2 THEN "
- + " INSERT (id, dep) VALUES (s.value, null)",
- commitTarget());
- });
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id > s.value "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET id = 10 "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED AND s.value = 2 THEN "
+ + " INSERT (id, dep) VALUES (s.value, null)",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkRuntimeException.class)
+ .hasMessageContaining(errorMsg);
});
assertEquals(
@@ -985,22 +983,21 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
ds.union(ds).createOrReplaceTempView("source");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkRuntimeException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id == s.value "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET id = 10 "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE",
- commitTarget());
- });
-
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET id = 10 "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkRuntimeException.class)
+ .hasMessageContaining(errorMsg);
assertEquals(
"Target should be unchanged",
ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
@@ -1019,21 +1016,21 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
ds.union(ds).createOrReplaceTempView("source");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkRuntimeException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id > s.value "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET id = 10 "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE",
- commitTarget());
- });
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id > s.value "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET id = 10 "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkRuntimeException.class)
+ .hasMessageContaining(errorMsg);
assertEquals(
"Target should be unchanged",
@@ -1055,23 +1052,23 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkRuntimeException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id == s.id "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET * "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE "
- + "WHEN NOT MATCHED AND s.id = 2 THEN "
- + " INSERT *",
- commitTarget());
- });
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.id "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET * "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED AND s.id = 2 THEN "
+ + " INSERT *",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkRuntimeException.class)
+ .hasMessageContaining(errorMsg);
assertEquals(
"Target should be unchanged",
@@ -1126,21 +1123,21 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkRuntimeException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id == s.id "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " DELETE "
- + "WHEN NOT MATCHED AND s.id = 2 THEN "
- + " INSERT *",
- commitTarget());
- });
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.id "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED AND s.id = 2 THEN "
+ + " INSERT *",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkRuntimeException.class)
+ .hasMessageContaining(errorMsg);
assertEquals(
"Target should be unchanged",
@@ -2270,47 +2267,42 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>",
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.invalid_col = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "A column or function parameter with name `t`.`invalid_col` cannot
be resolved");
- AssertHelpers.assertThrows(
- "Should complain about the invalid top-level column",
- AnalysisException.class,
- "A column or function parameter with name `t`.`invalid_col` cannot be
resolved",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.invalid_col = s.c2",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about the invalid nested column",
- AnalysisException.class,
- "No such struct field `invalid_col`",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n2.invalid_col = s.c2",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n2.invalid_col = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("No such struct field `invalid_col`");
- AssertHelpers.assertThrows(
- "Should complain about the invalid top-level column",
- AnalysisException.class,
- "A column or function parameter with name `invalid_col` cannot be
resolved",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n2.dn1 = s.c2 "
- + "WHEN NOT MATCHED THEN "
- + " INSERT (id, invalid_col) VALUES (s.c1, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n2.dn1 = s.c2 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, invalid_col) VALUES (s.c1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "A column or function parameter with name `invalid_col` cannot be
resolved");
}
@Test
@@ -2319,49 +2311,41 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>> NOT NULL",
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n2.dn1 = s.c2 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, c.n2) VALUES (s.c1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("INSERT assignment keys cannot be nested
fields");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n2.dn1 = s.c2 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, id) VALUES (s.c1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Multiple assignments for 'id'");
- AssertHelpers.assertThrows(
- "Should complain about the nested column",
- AnalysisException.class,
- "INSERT assignment keys cannot be nested fields",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n2.dn1 = s.c2 "
- + "WHEN NOT MATCHED THEN "
- + " INSERT (id, c.n2) VALUES (s.c1, null)",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about duplicate columns",
- AnalysisException.class,
- "Multiple assignments for 'id'",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n2.dn1 = s.c2 "
- + "WHEN NOT MATCHED THEN "
- + " INSERT (id, id) VALUES (s.c1, null)",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about missing columns",
- AnalysisException.class,
- "No assignment for 'c'",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN NOT MATCHED THEN "
- + " INSERT (id) VALUES (s.c1)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id) VALUES (s.c1)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("No assignment for 'c'");
}
@Test
@@ -2390,32 +2374,27 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"id INT, a ARRAY<STRUCT<c1:INT,c2:INT>>, m MAP<STRING,STRING>",
"{ \"id\": 1, \"a\": [ { \"c1\": 2, \"c2\": 3 } ], \"m\": { \"k\":
\"v\"} }");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.a.c1 = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updating nested fields is only supported for
StructType");
- AssertHelpers.assertThrows(
- "Should complain about updating an array column",
- AnalysisException.class,
- "Updating nested fields is only supported for StructType",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.a.c1 = s.c2",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about updating a map column",
- AnalysisException.class,
- "Updating nested fields is only supported for StructType",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.m.key = 'new_key'",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.m.key = 'new_key'",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updating nested fields is only supported for
StructType");
}
@Test
@@ -2424,45 +2403,37 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>",
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Multiple assignments for 'id");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Multiple assignments for 'c.n1'");
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a top-level column",
- AnalysisException.class,
- "Multiple assignments for 'id",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a nested column",
- AnalysisException.class,
- "Multiple assignments for 'c.n1'",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a nested column",
- AnalysisException.class,
- "Conflicting assignments for 'c'",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET c.n1 = 1, c = named_struct('n1', 1, 'n2',
named_struct('dn1', 1, 'dn2', 2))",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET c.n1 = 1, c = named_struct('n1', 1,
'n2', named_struct('dn1', 1, 'dn2', 2))",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Conflicting assignments for 'c'");
}
@Test
@@ -2478,70 +2449,57 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
withSQLConf(
ImmutableMap.of(SQLConf.STORE_ASSIGNMENT_POLICY().key(), "ansi"),
() -> {
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a top-level column",
- SparkException.class,
- "Null value appeared in non-nullable field",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.id = cast(NULL as int)",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a nested column",
- SparkException.class,
- "Null value appeared in non-nullable field",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s.n1 = NULL",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about writing missing fields in structs",
- AnalysisException.class,
- "Cannot find data for the output column `s`.`n2`",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s = s.c2",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about writing invalid data types",
- AnalysisException.class,
- "Cannot safely cast",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s.n1 = s.c3",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about writing incompatible structs",
- AnalysisException.class,
- "Cannot find data for the output column `s`.`n2`.`dn2`",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s.n2 = s.c4",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.id = cast(NULL as int)",
+ commitTarget()))
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining("Null value appeared in non-nullable
field");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s.n1 = NULL",
+ commitTarget()))
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining("Null value appeared in non-nullable
field");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot find data for the output column
`s`.`n2`");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s.n1 = s.c3",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageEndingWith("Cannot safely cast `s`.`n1` \"STRING\" to
\"INT\".");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s.n2 = s.c4",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot find data for the output column
`s`.`n2`.`dn2`");
});
}
@@ -2558,131 +2516,113 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
withSQLConf(
ImmutableMap.of(SQLConf.STORE_ASSIGNMENT_POLICY().key(), "strict"),
() -> {
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a top-level column",
- AnalysisException.class,
- "Cannot safely cast `id` \"VOID\" to \"INT\"",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.id = NULL",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.id = NULL",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot safely cast `id` \"VOID\" to
\"INT\"");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s.n1 = NULL",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot safely cast `s`.`n1` \"VOID\" to
\"INT\"");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot find data for the output column
`s`.`n2`");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s.n1 = s.c3",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageEndingWith("Cannot safely cast `s`.`n1` \"STRING\" to
\"INT\".");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s.n2 = s.c4",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot find data for the output column
`s`.`n2`.`dn2`");
+ });
+ }
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a nested column",
- AnalysisException.class,
- "Cannot safely cast `s`.`n1` \"VOID\" to \"INT\"",
- () -> {
+ @Test
+ public void testMergeWithNonDeterministicConditions() {
+ createAndInitTable(
+ "id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>",
+ "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
+ createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+ Assertions.assertThatThrownBy(
+ () ->
sql(
"MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
+ + "ON t.id == s.c1 AND rand() > t.id "
+ "WHEN MATCHED THEN "
- + " UPDATE SET t.s.n1 = NULL",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about writing missing fields in structs",
- AnalysisException.class,
- "Cannot find data for the output column `s`.`n2`",
- () -> {
+ + " UPDATE SET t.c.n1 = -1",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported SEARCH condition.
Non-deterministic expressions are not allowed");
+ Assertions.assertThatThrownBy(
+ () ->
sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s = s.c2",
- commitTarget());
- });
+ + "WHEN MATCHED AND rand() > t.id THEN "
+ + " UPDATE SET t.c.n1 = -1",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported UPDATE condition.
Non-deterministic expressions are not allowed");
- AssertHelpers.assertThrows(
- "Should complain about writing invalid data types",
- AnalysisException.class,
- "Cannot safely cast",
- () -> {
+ Assertions.assertThatThrownBy(
+ () ->
sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s.n1 = s.c3",
- commitTarget());
- });
+ + "WHEN MATCHED AND rand() > t.id THEN "
+ + " DELETE",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported DELETE condition.
Non-deterministic expressions are not allowed");
- AssertHelpers.assertThrows(
- "Should complain about writing incompatible structs",
- AnalysisException.class,
- "annot find data for the output column `s`.`n2`.`dn2`",
- () -> {
+ Assertions.assertThatThrownBy(
+ () ->
sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s.n2 = s.c4",
- commitTarget());
- });
- });
- }
-
- @Test
- public void testMergeWithNonDeterministicConditions() {
- createAndInitTable(
- "id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>",
- "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
- createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
-
- AssertHelpers.assertThrows(
- "Should complain about non-deterministic search conditions",
- AnalysisException.class,
- "MERGE operation contains unsupported SEARCH condition.
Non-deterministic expressions are not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 AND rand() > t.id "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n1 = -1",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about non-deterministic update conditions",
- AnalysisException.class,
- "MERGE operation contains unsupported UPDATE condition.
Non-deterministic expressions are not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND rand() > t.id THEN "
- + " UPDATE SET t.c.n1 = -1",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about non-deterministic delete conditions",
- AnalysisException.class,
- "MERGE operation contains unsupported DELETE condition.
Non-deterministic expressions are not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND rand() > t.id THEN "
- + " DELETE",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about non-deterministic insert conditions",
- AnalysisException.class,
- "MERGE operation contains unsupported INSERT condition.
Non-deterministic expressions are not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN NOT MATCHED AND rand() > c1 THEN "
- + " INSERT (id, c) VALUES (1, null)",
- commitTarget());
- });
+ + "WHEN NOT MATCHED AND rand() > c1 THEN "
+ + " INSERT (id, c) VALUES (1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported INSERT condition.
Non-deterministic expressions are not allowed");
}
@Test
@@ -2691,58 +2631,53 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>",
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 AND max(t.id) == 1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n1 = -1",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported SEARCH condition. Aggregates
are not allowed");
- AssertHelpers.assertThrows(
- "Should complain about agg expressions in search conditions",
- AnalysisException.class,
- "MERGE operation contains unsupported SEARCH condition. Aggregates are
not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 AND max(t.id) == 1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n1 = -1",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about agg expressions in update conditions",
- AnalysisException.class,
- "MERGE operation contains unsupported UPDATE condition. Aggregates are
not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND sum(t.id) < 1 THEN "
- + " UPDATE SET t.c.n1 = -1",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED AND sum(t.id) < 1 THEN "
+ + " UPDATE SET t.c.n1 = -1",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported UPDATE condition. Aggregates
are not allowed");
- AssertHelpers.assertThrows(
- "Should complain about agg expressions in delete conditions",
- AnalysisException.class,
- "MERGE operation contains unsupported DELETE condition. Aggregates are
not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND sum(t.id) THEN "
- + " DELETE",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED AND sum(t.id) THEN "
+ + " DELETE",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported DELETE condition. Aggregates
are not allowed");
- AssertHelpers.assertThrows(
- "Should complain about agg expressions in insert conditions",
- AnalysisException.class,
- "MERGE operation contains unsupported INSERT condition. Aggregates are
not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN NOT MATCHED AND sum(c1) < 1 THEN "
- + " INSERT (id, c) VALUES (1, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN NOT MATCHED AND sum(c1) < 1 THEN "
+ + " INSERT (id, c) VALUES (1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported INSERT condition. Aggregates
are not allowed");
}
@Test
@@ -2751,58 +2686,53 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>",
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 AND t.id < (SELECT max(c2) FROM
source) "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n1 = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported SEARCH condition. Subqueries
are not allowed");
- AssertHelpers.assertThrows(
- "Should complain about subquery expressions",
- AnalysisException.class,
- "MERGE operation contains unsupported SEARCH condition. Subqueries are
not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 AND t.id < (SELECT max(c2) FROM source) "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n1 = s.c2",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about subquery expressions",
- AnalysisException.class,
- "MERGE operation contains unsupported UPDATE condition. Subqueries are
not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND t.id < (SELECT max(c2) FROM source) THEN
"
- + " UPDATE SET t.c.n1 = s.c2",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED AND t.id < (SELECT max(c2) FROM
source) THEN "
+ + " UPDATE SET t.c.n1 = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported UPDATE condition. Subqueries
are not allowed");
- AssertHelpers.assertThrows(
- "Should complain about subquery expressions",
- AnalysisException.class,
- "MERGE operation contains unsupported DELETE condition. Subqueries are
not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND t.id NOT IN (SELECT c2 FROM source) THEN
"
- + " DELETE",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED AND t.id NOT IN (SELECT c2 FROM
source) THEN "
+ + " DELETE",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported DELETE condition. Subqueries
are not allowed");
- AssertHelpers.assertThrows(
- "Should complain about subquery expressions",
- AnalysisException.class,
- "MERGE operation contains unsupported INSERT condition. Subqueries are
not allowed",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN NOT MATCHED AND s.c1 IN (SELECT c2 FROM source) THEN
"
- + " INSERT (id, c) VALUES (1, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN NOT MATCHED AND s.c1 IN (SELECT c2 FROM
source) THEN "
+ + " INSERT (id, c) VALUES (1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "MERGE operation contains unsupported INSERT condition. Subqueries
are not allowed");
}
@Test
@@ -2810,18 +2740,16 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
createAndInitTable("id INT, c2 INT", "{ \"id\": 1, \"c2\": 2 }");
createOrReplaceView("source", "{ \"id\": 1, \"value\": 11 }");
- AssertHelpers.assertThrows(
- "Should complain about the target column",
- AnalysisException.class,
- "A column or function parameter with name `c2` cannot be resolved",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.id "
- + "WHEN NOT MATCHED AND c2 = 1 THEN "
- + " INSERT (id, c2) VALUES (s.id, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.id "
+ + "WHEN NOT MATCHED AND c2 = 1 THEN "
+ + " INSERT (id, c2) VALUES (s.id, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("A column or function parameter with name `c2`
cannot be resolved");
}
@Test
@@ -2829,17 +2757,15 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
createOrReplaceView("target", "{ \"c1\": -100, \"c2\": -200 }");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "Should complain non iceberg target table",
- UnsupportedOperationException.class,
- "MERGE INTO TABLE is not supported temporarily.",
- () -> {
- sql(
- "MERGE INTO target t USING source s "
- + "ON t.c1 == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET *");
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO target t USING source s "
+ + "ON t.c1 == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET *"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("MERGE INTO TABLE is not supported temporarily.");
}
/**
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
index 809de08379..4c678ce9b7 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
@@ -21,13 +21,13 @@ package org.apache.iceberg.spark.extensions;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;
@@ -186,21 +186,18 @@ public class TestRequiredDistributionAndOrdering extends
SparkExtensionsTestBase
Dataset<Row> inputDF = ds.coalesce(1).sortWithinPartitions("c1");
// should fail if ordering is disabled
- AssertHelpers.assertThrowsCause(
- "Should reject writes without ordering",
- IllegalStateException.class,
- "Encountered records that belong to already closed files",
- () -> {
- try {
- inputDF
- .writeTo(tableName)
- .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
- .option(SparkWriteOptions.FANOUT_ENABLED, "false")
- .append();
- } catch (NoSuchTableException e) {
- throw new RuntimeException(e);
- }
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ inputDF
+ .writeTo(tableName)
+
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+ .option(SparkWriteOptions.FANOUT_ENABLED, "false")
+ .append())
+ .cause()
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageStartingWith(
+ "Incoming records violate the writer assumption that records are
clustered by spec "
+ + "and by partition within each spec. Either cluster the
incoming records or switch to fanout writers.");
}
@Test
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
index 866a965e33..52b9134089 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
@@ -97,29 +96,26 @@ public class TestTagDDL extends SparkExtensionsTestBase {
}
String tagName = "t1";
- AssertHelpers.assertThrows(
- "Illegal statement",
- IcebergParseException.class,
- "mismatched input",
- () ->
- sql(
- "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN",
- tableName, tagName, firstSnapshotId, maxRefAge));
-
- AssertHelpers.assertThrows(
- "Illegal statement",
- IcebergParseException.class,
- "mismatched input",
- () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", tableName,
tagName, "abc"));
-
- AssertHelpers.assertThrows(
- "Illegal statement",
- IcebergParseException.class,
- "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}",
- () ->
- sql(
- "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d
SECONDS",
- tableName, tagName, firstSnapshotId, maxRefAge));
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN",
+ tableName, tagName, firstSnapshotId, maxRefAge))
+ .isInstanceOf(IcebergParseException.class)
+ .hasMessageContaining("mismatched input");
+
+ Assertions.assertThatThrownBy(
+ () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS",
tableName, tagName, "abc"))
+ .isInstanceOf(IcebergParseException.class)
+ .hasMessageContaining("mismatched input");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d
SECONDS",
+ tableName, tagName, firstSnapshotId, maxRefAge))
+ .isInstanceOf(IcebergParseException.class)
+ .hasMessageContaining("mismatched input 'SECONDS' expecting {'DAYS',
'HOURS', 'MINUTES'}");
}
@Test
@@ -137,11 +133,10 @@ public class TestTagDDL extends SparkExtensionsTestBase {
long snapshotId = table.currentSnapshot().snapshotId();
String tagName = "t1";
- AssertHelpers.assertThrows(
- "unknown snapshot",
- ValidationException.class,
- "unknown snapshot: -1",
- () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName,
tagName, -1));
+ Assertions.assertThatThrownBy(
+ () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d",
tableName, tagName, -1))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Cannot set " + tagName + " to unknown snapshot: -1");
sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName);
table.refresh();
@@ -151,17 +146,13 @@ public class TestTagDDL extends SparkExtensionsTestBase {
Assert.assertNull(
"The tag needs to have the default max ref age, which is null.",
ref.maxRefAgeMs());
- AssertHelpers.assertThrows(
- "Cannot create an exist tag",
- IllegalArgumentException.class,
- "already exists",
- () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName));
+ Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE TAG %s",
tableName, tagName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("already exists");
- AssertHelpers.assertThrows(
- "Non-conforming tag name",
- IcebergParseException.class,
- "mismatched input '123'",
- () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "123"));
+ Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE TAG %s",
tableName, "123"))
+ .isInstanceOf(IcebergParseException.class)
+ .hasMessageContaining("mismatched input '123'");
table.manageSnapshots().removeTag(tagName).commit();
List<SimpleRecord> records =
@@ -207,11 +198,10 @@ public class TestTagDDL extends SparkExtensionsTestBase {
insertRows();
long second = table.currentSnapshot().snapshotId();
- AssertHelpers.assertThrows(
- "Cannot perform replace tag on branches",
- IllegalArgumentException.class,
- "Ref branch1 is a branch not a tag",
- () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName,
second));
+ Assertions.assertThatThrownBy(
+ () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName,
second))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Ref branch1 is a branch not a tag");
}
@Test
@@ -244,14 +234,13 @@ public class TestTagDDL extends SparkExtensionsTestBase {
public void testReplaceTagDoesNotExist() throws NoSuchTableException {
Table table = insertRows();
- AssertHelpers.assertThrows(
- "Cannot perform replace tag on tag which does not exist",
- IllegalArgumentException.class,
- "Tag does not exist",
- () ->
- sql(
- "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d",
- tableName, "someTag", table.currentSnapshot().snapshotId()));
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d",
+ tableName, "someTag",
table.currentSnapshot().snapshotId()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Tag does not exist");
}
@Test
@@ -316,20 +305,17 @@ public class TestTagDDL extends SparkExtensionsTestBase {
@Test
public void testDropTagNonConformingName() {
- AssertHelpers.assertThrows(
- "Non-conforming tag name",
- IcebergParseException.class,
- "mismatched input '123'",
- () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "123"));
+ Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s DROP TAG %s",
tableName, "123"))
+ .isInstanceOf(IcebergParseException.class)
+ .hasMessageContaining("mismatched input '123'");
}
@Test
public void testDropTagDoesNotExist() {
- AssertHelpers.assertThrows(
- "Cannot perform drop tag on tag which does not exist",
- IllegalArgumentException.class,
- "Tag does not exist: nonExistingTag",
- () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "nonExistingTag"));
+ Assertions.assertThatThrownBy(
+ () -> sql("ALTER TABLE %s DROP TAG %s", tableName,
"nonExistingTag"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Tag does not exist: nonExistingTag");
}
@Test
@@ -338,11 +324,9 @@ public class TestTagDDL extends SparkExtensionsTestBase {
Table table = insertRows();
table.manageSnapshots().createBranch(branchName,
table.currentSnapshot().snapshotId()).commit();
- AssertHelpers.assertThrows(
- "Cannot perform drop tag on branch",
- IllegalArgumentException.class,
- "Ref b1 is a branch not a tag",
- () -> sql("ALTER TABLE %s DROP TAG %s", tableName, branchName));
+ Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s DROP TAG %s",
tableName, branchName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Ref b1 is a branch not a tag");
}
@Test
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 5633b30ee5..4cc8845bef 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PlanningMode;
@@ -1333,17 +1332,13 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
"id INT, a ARRAY<STRUCT<c1:INT,c2:INT>>, m MAP<STRING,STRING>",
"{ \"id\": 0, \"a\": null, \"m\": null }");
- AssertHelpers.assertThrows(
- "Should complain about updating an array column",
- AnalysisException.class,
- "Updating nested fields is only supported for StructType",
- () -> sql("UPDATE %s SET a.c1 = 1", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about updating a map column",
- AnalysisException.class,
- "Updating nested fields is only supported for StructType",
- () -> sql("UPDATE %s SET m.key = 'new_key'", commitTarget()));
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s SET a.c1 = 1",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updating nested fields is only supported for
StructType");
+
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s SET m.key = 'new_key'",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updating nested fields is only supported for
StructType");
}
@Test
@@ -1351,27 +1346,23 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
createAndInitTable(
"id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>", "{ \"id\": 0,
\"s\": null }");
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a top-level column",
- AnalysisException.class,
- "Multiple assignments for 'id'",
- () -> sql("UPDATE %s t SET t.id = 1, t.c.n1 = 2, t.id = 2",
commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a nested column",
- AnalysisException.class,
- "Multiple assignments for 'c.n1",
- () -> sql("UPDATE %s t SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2",
commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a nested column",
- AnalysisException.class,
- "Conflicting assignments for 'c'",
- () -> {
- sql(
- "UPDATE %s SET c.n1 = 1, c = named_struct('n1', 1, 'n2',
named_struct('dn1', 1, 'dn2', 2))",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s t SET t.id = 1, t.c.n1 = 2, t.id = 2",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Multiple assignments for 'id'");
+
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s t SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Multiple assignments for 'c.n1");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "UPDATE %s SET c.n1 = 1, c = named_struct('n1', 1, 'n2',
named_struct('dn1', 1, 'dn2', 2))",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Conflicting assignments for 'c'");
}
@Test
@@ -1383,36 +1374,30 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
withSQLConf(
ImmutableMap.of("spark.sql.storeAssignmentPolicy", "ansi"),
() -> {
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a top-level column",
- SparkException.class,
- "Null value appeared in non-nullable field",
- () -> sql("UPDATE %s t SET t.id = NULL", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a nested column",
- SparkException.class,
- "Null value appeared in non-nullable field",
- () -> sql("UPDATE %s t SET t.s.n1 = NULL", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing missing fields in structs",
- AnalysisException.class,
- "Cannot find data for the output column `s`.`n2`",
- () -> sql("UPDATE %s t SET t.s = named_struct('n1', 1)",
commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing invalid data types",
- AnalysisException.class,
- "Cannot safely cast",
- () -> sql("UPDATE %s t SET t.s.n1 = 'str'", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing incompatible structs",
- AnalysisException.class,
- "Cannot find data for the output column `s`.`n2`.`dn2`",
- () ->
- sql("UPDATE %s t SET t.s.n2 = named_struct('dn3', 1, 'dn1',
2)", commitTarget()));
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s t SET t.id =
NULL", commitTarget()))
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining("Null value appeared in non-nullable
field");
+
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s t SET t.s.n1 =
NULL", commitTarget()))
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining("Null value appeared in non-nullable
field");
+
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s t SET t.s = named_struct('n1', 1)",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot find data for the output column
`s`.`n2`");
+
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s t SET t.s.n1 =
'str'", commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot safely cast");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "UPDATE %s t SET t.s.n2 = named_struct('dn3', 1,
'dn1', 2)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot find data for the output column
`s`.`n2`.`dn2`");
});
}
@@ -1425,36 +1410,30 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
withSQLConf(
ImmutableMap.of("spark.sql.storeAssignmentPolicy", "strict"),
() -> {
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a top-level column",
- AnalysisException.class,
- "Cannot safely cast `id` \"VOID\" to \"INT\"",
- () -> sql("UPDATE %s t SET t.id = NULL", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a nested column",
- AnalysisException.class,
- "Cannot safely cast `s`.`n1` \"VOID\" to \"INT\"",
- () -> sql("UPDATE %s t SET t.s.n1 = NULL", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing missing fields in structs",
- AnalysisException.class,
- "Cannot find data for the output column",
- () -> sql("UPDATE %s t SET t.s = named_struct('n1', 1)",
commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing invalid data types",
- AnalysisException.class,
- "Cannot safely cast",
- () -> sql("UPDATE %s t SET t.s.n1 = 'str'", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing incompatible structs",
- AnalysisException.class,
- "Cannot find data for the output column",
- () ->
- sql("UPDATE %s t SET t.s.n2 = named_struct('dn3', 1, 'dn1',
2)", commitTarget()));
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s t SET t.id =
NULL", commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot safely cast `id` \"VOID\" to
\"INT\"");
+
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s t SET t.s.n1 =
NULL", commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot safely cast `s`.`n1` \"VOID\" to
\"INT\"");
+
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s t SET t.s = named_struct('n1', 1)",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot find data for the output column");
+
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s t SET t.s.n1 =
'str'", commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot safely cast");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "UPDATE %s t SET t.s.n2 = named_struct('dn3', 1,
'dn1', 2)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot find data for the output column");
});
}
@@ -1462,22 +1441,19 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
public void testUpdateWithNonDeterministicCondition() {
createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }");
- AssertHelpers.assertThrows(
- "Should complain about non-deterministic expressions",
- AnalysisException.class,
- "The operator expects a deterministic expression",
- () -> sql("UPDATE %s SET id = -1 WHERE id = 1 AND rand() > 0.5",
commitTarget()));
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s SET id = -1 WHERE id = 1 AND rand() > 0.5",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("The operator expects a deterministic
expression");
}
@Test
public void testUpdateOnNonIcebergTableNotSupported() {
createOrReplaceView("testtable", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "UPDATE is not supported for non iceberg table",
- UnsupportedOperationException.class,
- "not supported temporarily",
- () -> sql("UPDATE %s SET c1 = -1 WHERE c2 = 1", "testtable"));
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s SET c1 = -1 WHERE c2 =
1", "testtable"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("UPDATE TABLE is not supported temporarily.");
}
@Test