This is an automated email from the ASF dual-hosted git repository.
zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9432d374ee [GLUTEN-11088][VL] Fix GlutenDeltaBasedMergeIntoTableSuite
in Spark-4.0 (#11204)
9432d374ee is described below
commit 9432d374ee85f6db2a3245af5dd991f785a73cda
Author: Zhen Li <[email protected]>
AuthorDate: Thu Nov 27 21:48:40 2025 +0800
[GLUTEN-11088][VL] Fix GlutenDeltaBasedMergeIntoTableSuite in Spark-4.0
(#11204)
[VL] Fix GlutenDeltaBasedMergeIntoTableSuite in Spark-4.0.
---
.../gluten/utils/velox/VeloxTestSettings.scala | 6 +-
.../GlutenDeltaBasedMergeIntoTableSuite.scala | 3 +-
...ergeIntoTableUpdateAsDeleteAndInsertSuite.scala | 3 +-
.../GlutenGroupBasedMergeIntoTableSuite.scala | 3 +-
.../connector/GlutenMergeIntoTableSuiteBase.scala | 218 +++++++++++++++++++++
5 files changed, 227 insertions(+), 6 deletions(-)
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index ce10ddec49..b889422fc4 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -984,10 +984,10 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenRuntimeNullChecksV2Writes]
enableSuite[GlutenTableOptionsConstantFoldingSuite]
enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
- // TODO: fix in Spark-4.0
+ // Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
- // TODO: fix in Spark-4.0
+ // Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
enableSuite[GlutenDeltaBasedUpdateAsDeleteAndInsertTableSuite]
// FIXME: complex type result mismatch
@@ -995,7 +995,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("update char/varchar columns")
enableSuite[GlutenDeltaBasedUpdateTableSuite]
enableSuite[GlutenGroupBasedMergeIntoTableSuite]
- // TODO: fix in Spark-4.0
+ // Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
enableSuite[GlutenFileSourceCustomMetadataStructSuite]
enableSuite[GlutenParquetFileMetadataStructRowIndexSuite]
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableSuite.scala
index 2ca5d06f99..ca25311868 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableSuite.scala
@@ -20,4 +20,5 @@ import org.apache.spark.sql.GlutenSQLTestsBaseTrait
class GlutenDeltaBasedMergeIntoTableSuite
extends DeltaBasedMergeIntoTableSuite
- with GlutenSQLTestsBaseTrait {}
+ with GlutenSQLTestsBaseTrait
+ with GlutenMergeIntoTableSuiteBase {}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite.scala
index 47a3670d06..458ee89ffd 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite.scala
@@ -20,4 +20,5 @@ import org.apache.spark.sql.GlutenSQLTestsBaseTrait
class GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite
extends DeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite
- with GlutenSQLTestsBaseTrait {}
+ with GlutenSQLTestsBaseTrait
+ with GlutenMergeIntoTableSuiteBase {}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenGroupBasedMergeIntoTableSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenGroupBasedMergeIntoTableSuite.scala
index 9bf7abb2b7..791a0d7a86 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenGroupBasedMergeIntoTableSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenGroupBasedMergeIntoTableSuite.scala
@@ -20,4 +20,5 @@ import org.apache.spark.sql.GlutenSQLTestsBaseTrait
class GlutenGroupBasedMergeIntoTableSuite
extends GroupBasedMergeIntoTableSuite
- with GlutenSQLTestsBaseTrait {}
+ with GlutenSQLTestsBaseTrait
+ with GlutenMergeIntoTableSuiteBase {}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenMergeIntoTableSuiteBase.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenMergeIntoTableSuiteBase.scala
new file mode 100644
index 0000000000..11d80e20e6
--- /dev/null
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenMergeIntoTableSuiteBase.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * A trait that provides Gluten-compatible cardinality error assertion for
merge operations.
+ *
+ * In Gluten, SparkRuntimeException is wrapped inside GlutenException, so we
need to check the
+ * exception chain for the expected error message instead of matching the
exact exception type.
+ */
+trait GlutenMergeIntoTableSuiteBase extends MergeIntoTableSuiteBase with
GlutenSQLTestsTrait {
+
+ import testImplicits._
+
+ /** Helper method to find if any exception in the chain contains the
expected message. */
+ private def findInExceptionChain(e: Throwable, expectedMessage: String):
Boolean = {
+ var current: Throwable = e
+ while (current != null) {
+ if (current.getMessage != null &&
current.getMessage.contains(expectedMessage)) {
+ return true
+ }
+ current = current.getCause
+ }
+ false
+ }
+
+ /**
+ * Gluten-compatible version of assertCardinalityError. The original method
expects
+ * SparkRuntimeException directly, but Gluten wraps it in GlutenException.
+ */
+ protected def assertGlutenCardinalityError(query: String): Unit = {
+ val e = intercept[Exception] {
+ sql(query)
+ }
+ assert(
+ findInExceptionChain(e, "ON search condition of the MERGE statement"),
+ s"Expected cardinality violation error but got: ${e.getMessage}")
+ }
+
+ testGluten("merge cardinality check with conditional MATCHED clause
(delete)") {
+ withTempView("source") {
+ createAndInitTable(
+ "pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 6, "salary": 600, "dep": "software" }
+ |""".stripMargin
+ )
+
+ val sourceRows = Seq((1, 101, "support"), (1, 102, "support"), (2, 201,
"support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ assertGlutenCardinalityError(
+ s"""MERGE INTO $tableNameAsString AS t
+ |USING source AS s
+ |ON t.pk = s.pk
+ |WHEN MATCHED AND s.salary = 101 THEN
+ | DELETE
+ |""".stripMargin
+ )
+ }
+ }
+
+ testGluten("merge cardinality check with small target and large source
(broadcast enabled)") {
+ withTempView("source") {
+ createAndInitTable(
+ "pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin
+ )
+
+ val sourceRows = (1 to 1000).map(pk => (pk, pk * 100, "support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key ->
Long.MaxValue.toString) {
+ assertGlutenCardinalityError(
+ s"""MERGE INTO $tableNameAsString AS t
+ |USING (SELECT * FROM source UNION ALL SELECT * FROM source) AS s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |""".stripMargin
+ )
+
+ assert(sql(s"SELECT * FROM $tableNameAsString").count() == 2)
+ }
+ }
+ }
+
+ testGluten("merge cardinality check with small target and large source
(broadcast disabled)") {
+ withTempView("source") {
+ createAndInitTable(
+ "pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin
+ )
+
+ val sourceRows = (1 to 1000).map(pk => (pk, pk * 100, "support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ assertGlutenCardinalityError(
+ s"""MERGE INTO $tableNameAsString AS t
+ |USING (SELECT * FROM source UNION ALL SELECT * FROM source) AS s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |""".stripMargin
+ )
+
+ assert(sql(s"SELECT * FROM $tableNameAsString").count() == 2)
+ }
+ }
+ }
+
+ testGluten("merge cardinality check with small target and large source
(shuffle hash enabled)") {
+ withTempView("source") {
+ createAndInitTable(
+ "pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin
+ )
+
+ val sourceRows = (1 to 1000).map(pk => (pk, pk * 100, "support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+ assertGlutenCardinalityError(
+ s"""MERGE INTO $tableNameAsString AS t
+ |USING (SELECT * FROM source UNION ALL SELECT * FROM source) AS s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |""".stripMargin
+ )
+
+ assert(sql(s"SELECT * FROM $tableNameAsString").count() == 2)
+ }
+ }
+ }
+
+ testGluten("merge cardinality check without equality condition and only
MATCHED clauses") {
+ withTempView("source") {
+ createAndInitTable(
+ "pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin
+ )
+
+ val sourceRows = (1 to 1000).map(pk => (pk, pk * 100, "support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ assertGlutenCardinalityError(
+ s"""MERGE INTO $tableNameAsString AS t
+ |USING (SELECT * FROM source UNION ALL SELECT * FROM source) AS s
+ |ON t.pk > s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |""".stripMargin
+ )
+
+ assert(sql(s"SELECT * FROM $tableNameAsString").count() == 2)
+ }
+ }
+ }
+
+ testGluten("merge cardinality check without equality condition") {
+ withTempView("source") {
+ createAndInitTable(
+ "pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin
+ )
+
+ val sourceRows = (1 to 1000).map(pk => (pk, pk * 100, "support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ assertGlutenCardinalityError(
+ s"""MERGE INTO $tableNameAsString AS t
+ |USING (SELECT * FROM source UNION ALL SELECT * FROM source) AS s
+ |ON t.pk > s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+ )
+
+ assert(sql(s"SELECT * FROM $tableNameAsString").count() == 2)
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]