This is an automated email from the ASF dual-hosted git repository.

zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9432d374ee [GLUTEN-11088][VL] Fix GlutenDeltaBasedMergeIntoTableSuite 
in Spark-4.0 (#11204)
9432d374ee is described below

commit 9432d374ee85f6db2a3245af5dd991f785a73cda
Author: Zhen Li <[email protected]>
AuthorDate: Thu Nov 27 21:48:40 2025 +0800

    [GLUTEN-11088][VL] Fix GlutenDeltaBasedMergeIntoTableSuite in Spark-4.0 
(#11204)
    
    [VL] Fix GlutenDeltaBasedMergeIntoTableSuite in Spark-4.0.
---
 .../gluten/utils/velox/VeloxTestSettings.scala     |   6 +-
 .../GlutenDeltaBasedMergeIntoTableSuite.scala      |   3 +-
 ...ergeIntoTableUpdateAsDeleteAndInsertSuite.scala |   3 +-
 .../GlutenGroupBasedMergeIntoTableSuite.scala      |   3 +-
 .../connector/GlutenMergeIntoTableSuiteBase.scala  | 218 +++++++++++++++++++++
 5 files changed, 227 insertions(+), 6 deletions(-)

diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index ce10ddec49..b889422fc4 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -984,10 +984,10 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenRuntimeNullChecksV2Writes]
   enableSuite[GlutenTableOptionsConstantFoldingSuite]
   enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
-    // TODO: fix in Spark-4.0
+    // Replaced by Gluten versions that handle wrapped exceptions
     .excludeByPrefix("merge cardinality check with")
   enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
-    // TODO: fix in Spark-4.0
+    // Replaced by Gluten versions that handle wrapped exceptions
     .excludeByPrefix("merge cardinality check with")
   enableSuite[GlutenDeltaBasedUpdateAsDeleteAndInsertTableSuite]
     // FIXME: complex type result mismatch
@@ -995,7 +995,7 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("update char/varchar columns")
   enableSuite[GlutenDeltaBasedUpdateTableSuite]
   enableSuite[GlutenGroupBasedMergeIntoTableSuite]
-    // TODO: fix in Spark-4.0
+    // Replaced by Gluten versions that handle wrapped exceptions
     .excludeByPrefix("merge cardinality check with")
   enableSuite[GlutenFileSourceCustomMetadataStructSuite]
   enableSuite[GlutenParquetFileMetadataStructRowIndexSuite]
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableSuite.scala
index 2ca5d06f99..ca25311868 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableSuite.scala
@@ -20,4 +20,5 @@ import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 
 class GlutenDeltaBasedMergeIntoTableSuite
   extends DeltaBasedMergeIntoTableSuite
-  with GlutenSQLTestsBaseTrait {}
+  with GlutenSQLTestsBaseTrait
+  with GlutenMergeIntoTableSuiteBase {}
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite.scala
index 47a3670d06..458ee89ffd 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite.scala
@@ -20,4 +20,5 @@ import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 
 class GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite
   extends DeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite
-  with GlutenSQLTestsBaseTrait {}
+  with GlutenSQLTestsBaseTrait
+  with GlutenMergeIntoTableSuiteBase {}
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenGroupBasedMergeIntoTableSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenGroupBasedMergeIntoTableSuite.scala
index 9bf7abb2b7..791a0d7a86 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenGroupBasedMergeIntoTableSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenGroupBasedMergeIntoTableSuite.scala
@@ -20,4 +20,5 @@ import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 
 class GlutenGroupBasedMergeIntoTableSuite
   extends GroupBasedMergeIntoTableSuite
-  with GlutenSQLTestsBaseTrait {}
+  with GlutenSQLTestsBaseTrait
+  with GlutenMergeIntoTableSuiteBase {}
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenMergeIntoTableSuiteBase.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenMergeIntoTableSuiteBase.scala
new file mode 100644
index 0000000000..11d80e20e6
--- /dev/null
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/connector/GlutenMergeIntoTableSuiteBase.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * A trait that provides Gluten-compatible cardinality error assertion for 
merge operations.
+ *
+ * In Gluten, SparkRuntimeException is wrapped inside GlutenException, so we 
need to check the
+ * exception chain for the expected error message instead of matching the 
exact exception type.
+ */
+trait GlutenMergeIntoTableSuiteBase extends MergeIntoTableSuiteBase with 
GlutenSQLTestsTrait {
+
+  import testImplicits._
+
+  /** Helper method to find if any exception in the chain contains the 
expected message. */
+  private def findInExceptionChain(e: Throwable, expectedMessage: String): 
Boolean = {
+    var current: Throwable = e
+    while (current != null) {
+      if (current.getMessage != null && 
current.getMessage.contains(expectedMessage)) {
+        return true
+      }
+      current = current.getCause
+    }
+    false
+  }
+
+  /**
+   * Gluten-compatible version of assertCardinalityError. The original method 
expects
+   * SparkRuntimeException directly, but Gluten wraps it in GlutenException.
+   */
+  protected def assertGlutenCardinalityError(query: String): Unit = {
+    val e = intercept[Exception] {
+      sql(query)
+    }
+    assert(
+      findInExceptionChain(e, "ON search condition of the MERGE statement"),
+      s"Expected cardinality violation error but got: ${e.getMessage}")
+  }
+
+  testGluten("merge cardinality check with conditional MATCHED clause 
(delete)") {
+    withTempView("source") {
+      createAndInitTable(
+        "pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 6, "salary": 600, "dep": "software" }
+          |""".stripMargin
+      )
+
+      val sourceRows = Seq((1, 101, "support"), (1, 102, "support"), (2, 201, 
"support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      assertGlutenCardinalityError(
+        s"""MERGE INTO $tableNameAsString AS t
+           |USING source AS s
+           |ON t.pk = s.pk
+           |WHEN MATCHED AND s.salary = 101 THEN
+           | DELETE
+           |""".stripMargin
+      )
+    }
+  }
+
+  testGluten("merge cardinality check with small target and large source 
(broadcast enabled)") {
+    withTempView("source") {
+      createAndInitTable(
+        "pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin
+      )
+
+      val sourceRows = (1 to 1000).map(pk => (pk, pk * 100, "support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> 
Long.MaxValue.toString) {
+        assertGlutenCardinalityError(
+          s"""MERGE INTO $tableNameAsString AS t
+             |USING (SELECT * FROM source UNION ALL SELECT * FROM source) AS s
+             |ON t.pk = s.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |""".stripMargin
+        )
+
+        assert(sql(s"SELECT * FROM $tableNameAsString").count() == 2)
+      }
+    }
+  }
+
+  testGluten("merge cardinality check with small target and large source 
(broadcast disabled)") {
+    withTempView("source") {
+      createAndInitTable(
+        "pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin
+      )
+
+      val sourceRows = (1 to 1000).map(pk => (pk, pk * 100, "support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        assertGlutenCardinalityError(
+          s"""MERGE INTO $tableNameAsString AS t
+             |USING (SELECT * FROM source UNION ALL SELECT * FROM source) AS s
+             |ON t.pk = s.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |""".stripMargin
+        )
+
+        assert(sql(s"SELECT * FROM $tableNameAsString").count() == 2)
+      }
+    }
+  }
+
+  testGluten("merge cardinality check with small target and large source 
(shuffle hash enabled)") {
+    withTempView("source") {
+      createAndInitTable(
+        "pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin
+      )
+
+      val sourceRows = (1 to 1000).map(pk => (pk, pk * 100, "support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      withSQLConf(
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+        SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+        assertGlutenCardinalityError(
+          s"""MERGE INTO $tableNameAsString AS t
+             |USING (SELECT * FROM source UNION ALL SELECT * FROM source) AS s
+             |ON t.pk = s.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |""".stripMargin
+        )
+
+        assert(sql(s"SELECT * FROM $tableNameAsString").count() == 2)
+      }
+    }
+  }
+
+  testGluten("merge cardinality check without equality condition and only 
MATCHED clauses") {
+    withTempView("source") {
+      createAndInitTable(
+        "pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin
+      )
+
+      val sourceRows = (1 to 1000).map(pk => (pk, pk * 100, "support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        assertGlutenCardinalityError(
+          s"""MERGE INTO $tableNameAsString AS t
+             |USING (SELECT * FROM source UNION ALL SELECT * FROM source) AS s
+             |ON t.pk > s.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |""".stripMargin
+        )
+
+        assert(sql(s"SELECT * FROM $tableNameAsString").count() == 2)
+      }
+    }
+  }
+
+  testGluten("merge cardinality check without equality condition") {
+    withTempView("source") {
+      createAndInitTable(
+        "pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin
+      )
+
+      val sourceRows = (1 to 1000).map(pk => (pk, pk * 100, "support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        assertGlutenCardinalityError(
+          s"""MERGE INTO $tableNameAsString AS t
+             |USING (SELECT * FROM source UNION ALL SELECT * FROM source) AS s
+             |ON t.pk > s.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |WHEN NOT MATCHED THEN
+             | INSERT *
+             |""".stripMargin
+        )
+
+        assert(sql(s"SELECT * FROM $tableNameAsString").count() == 2)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to