This is an automated email from the ASF dual-hosted git repository.

dcoliversun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b6c278e1ad [VL][DELTA] Reconcile struct field names in CaseWhen/If for 
Velox SWITCH compatibility (#11948)
b6c278e1ad is described below

commit b6c278e1ad3bf7d03a7fe3221f0f007d9d4435f5
Author: Ankita Victor <[email protected]>
AuthorDate: Fri May 1 05:50:13 2026 +0530

    [VL][DELTA] Reconcile struct field names in CaseWhen/If for Velox SWITCH 
compatibility (#11948)
    
    * Use consistent struct field names
    
    * Test minor change
---
 .../gluten/extension/DeltaPostTransformRules.scala |  99 ++++++++++--
 .../org/apache/gluten/execution/DeltaSuite.scala   | 176 +++++++++++++++++++++
 .../columnar/CollapseProjectExecTransformer.scala  |  17 +-
 3 files changed, 280 insertions(+), 12 deletions(-)

diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
index da81bdf83f..e16a6d12fd 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
@@ -20,13 +20,16 @@ import org.apache.gluten.execution.{DeltaScanTransformer, 
ProjectExecTransformer
 import org.apache.gluten.extension.columnar.transition.RemoveTransitions
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, 
InputFileName}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, CreateNamedStruct, Expression, GetStructField, If, 
InputFileBlockLength, InputFileBlockStart, InputFileName, IsNull, 
LambdaFunction, Literal, NamedLambdaVariable}
+import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, 
TransformKeys, TransformValues}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, 
NoMapping}
 import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
 
+import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 object DeltaPostTransformRules {
@@ -93,6 +96,73 @@ object DeltaPostTransformRules {
     }
   }
 
+  /**
+   * Checks whether two structurally compatible DataTypes have different 
struct field names at any
+   * nesting level.
+   */
+  private def nestedFieldNamesDiffer(logical: DataType, physical: DataType): 
Boolean = {
+    (logical, physical) match {
+      case (l: StructType, p: StructType) if l.length == p.length =>
+        l.zip(p).exists {
+          case (lf, pf) =>
+            lf.name != pf.name || nestedFieldNamesDiffer(lf.dataType, 
pf.dataType)
+        }
+      case (l: ArrayType, p: ArrayType) =>
+        nestedFieldNamesDiffer(l.elementType, p.elementType)
+      case (l: MapType, p: MapType) =>
+        nestedFieldNamesDiffer(l.keyType, p.keyType) ||
+        nestedFieldNamesDiffer(l.valueType, p.valueType)
+      case _ => false
+    }
+  }
+
+  /**
+   * Rebuilds an expression tree so that nested struct field names match the 
logical schema. Uses
+   * positional extraction (GetStructField) and reconstruction 
(CreateNamedStruct) instead of Cast,
+   * so correctness does not depend on Velox's cast_match_struct_by_name 
config.
+   */
+  private def reconcileFieldNames(
+      expr: Expression,
+      logical: DataType,
+      physical: DataType): Expression = {
+    (logical, physical) match {
+      case (l: StructType, p: StructType) if l.length == p.length =>
+        val rebuiltFields = l.zip(p).zipWithIndex.flatMap {
+          case ((lf, pf), i) =>
+            val extracted = GetStructField(expr, i, None)
+            val reconciled = reconcileFieldNames(extracted, lf.dataType, 
pf.dataType)
+            Seq(Literal(lf.name), reconciled)
+        }
+        val rebuilt = CreateNamedStruct(rebuiltFields)
+        If(IsNull(expr), Literal.create(null, l), rebuilt)
+      case (l: ArrayType, p: ArrayType) if 
nestedFieldNamesDiffer(l.elementType, p.elementType) =>
+        val lambdaVar = NamedLambdaVariable("element", p.elementType, 
p.containsNull)
+        val body = reconcileFieldNames(lambdaVar, l.elementType, p.elementType)
+        ArrayTransform(expr, LambdaFunction(body, Seq(lambdaVar)))
+      case (l: MapType, p: MapType) =>
+        val needKeys = nestedFieldNamesDiffer(l.keyType, p.keyType)
+        val needValues = nestedFieldNamesDiffer(l.valueType, p.valueType)
+        var result = expr
+        if (needValues) {
+          val keyVar = NamedLambdaVariable("key", p.keyType, false)
+          val valueVar = NamedLambdaVariable("value", p.valueType, 
p.valueContainsNull)
+          val body = reconcileFieldNames(valueVar, l.valueType, p.valueType)
+          result = TransformValues(result, LambdaFunction(body, Seq(keyVar, 
valueVar)))
+        }
+        if (needKeys) {
+          val keyVar = NamedLambdaVariable("key", p.keyType, false)
+          val valueVar = NamedLambdaVariable(
+            "value",
+            if (needValues) l.valueType else p.valueType,
+            p.valueContainsNull)
+          val body = reconcileFieldNames(keyVar, l.keyType, p.keyType)
+          result = TransformKeys(result, LambdaFunction(body, Seq(keyVar, 
valueVar)))
+        }
+        result
+      case _ => expr
+    }
+  }
+
   /**
    * This method is only used for Delta ColumnMapping FileFormat(e.g. 
nameMapping and idMapping)
    * transform the metadata of Delta into Parquet's, each plan should only be 
transformed once.
@@ -115,8 +185,9 @@ object DeltaPostTransformRules {
       )(SparkSession.active)
       // transform output's name into physical name so Reader can read data 
correctly
       // should keep the columns order the same as the origin output
-      val originColumnNames = ListBuffer.empty[String]
-      val transformedAttrs = ListBuffer.empty[Attribute]
+      case class ColumnMapping(logicalName: String, logicalType: DataType, 
physicalAttr: Attribute)
+      val columnMappings = ListBuffer.empty[ColumnMapping]
+      val seenNames = mutable.Set.empty[String]
       def mapAttribute(attr: Attribute) = {
         val newAttr = if (plan.isMetadataColumn(attr)) {
           attr
@@ -127,9 +198,8 @@ object DeltaPostTransformRules {
             .createPhysicalAttributes(Seq(attr), fmt.referenceSchema, 
fmt.columnMappingMode)
             .head
         }
-        if (!originColumnNames.contains(attr.name)) {
-          transformedAttrs += newAttr
-          originColumnNames += attr.name
+        if (seenNames.add(attr.name)) {
+          columnMappings += ColumnMapping(attr.name, attr.dataType, newAttr)
         }
         newAttr
       }
@@ -169,9 +239,20 @@ object DeltaPostTransformRules {
       scanExecTransformer.copyTagsFrom(plan)
       tagColumnMappingRule(scanExecTransformer)
 
-      // alias physicalName into tableName
-      val expr = (transformedAttrs, originColumnNames).zipped.map {
-        (attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId)
+      // Alias physical names back to logical names. For struct-typed columns, 
Delta column
+      // mapping renames internal field names to physical UUIDs. A top-level 
Alias only restores
+      // the column name, not the struct's internal field names. We rebuild 
the struct with
+      // logical field names using positional extraction 
(GetStructField/CreateNamedStruct)
+      // instead of Cast, so correctness does not depend on any Velox cast 
config.
+      val expr = columnMappings.map {
+        cm =>
+          val projectedExpr: Expression =
+            if (nestedFieldNamesDiffer(cm.logicalType, 
cm.physicalAttr.dataType)) {
+              reconcileFieldNames(cm.physicalAttr, cm.logicalType, 
cm.physicalAttr.dataType)
+            } else {
+              cm.physicalAttr
+            }
+          Alias(projectedExpr, cm.logicalName)(exprId = cm.physicalAttr.exprId)
       }
       val projectExecTransformer = ProjectExecTransformer(expr.toSeq, 
scanExecTransformer)
       projectExecTransformer
diff --git 
a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala 
b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
index 8b4d7b374d..031bf46034 100644
--- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
+++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
@@ -399,4 +399,180 @@ abstract class DeltaSuite extends 
WholeStageTransformerSuite {
       checkAnswer(df, Seq(Row(2), Row(3)))
     }
   }
+
+  testWithMinSparkVersion(
+    "merge with column mapping handles struct field metadata correctly",
+    "3.4") {
+    withTable("merge_struct_source", "merge_struct_target") {
+      spark.sql("""
+                  |CREATE TABLE merge_struct_target(
+                  |  key INT NOT NULL,
+                  |  value INT NOT NULL,
+                  |  cstruct STRUCT<foo: INT>)
+                  |USING DELTA
+                  |TBLPROPERTIES (
+                  |  'delta.minReaderVersion' = '2',
+                  |  'delta.minWriterVersion' = '5',
+                  |  'delta.columnMapping.mode' = 'name')
+        """.stripMargin)
+      spark.sql("INSERT INTO merge_struct_target VALUES (0, 0, null)")
+      spark.sql("INSERT INTO merge_struct_target VALUES (100, 100, 
named_struct('foo', 42))")
+
+      spark.sql(
+        "CREATE TABLE merge_struct_source (key INT NOT NULL, value INT NOT 
NULL) USING DELTA")
+      spark.sql("INSERT INTO merge_struct_source VALUES (1, 1)")
+
+      // MERGE with updateNotMatched to test CaseWhen else branch
+      spark.sql("""
+                  |MERGE INTO merge_struct_target AS target
+                  |USING merge_struct_source AS source
+                  |ON source.key = target.key
+                  |WHEN MATCHED THEN
+                  |  UPDATE SET target.value = source.value
+                  |WHEN NOT MATCHED BY SOURCE AND target.key = 100 THEN
+                  |  UPDATE SET target.value = 22
+        """.stripMargin)
+
+      val df = runQueryAndCompare(
+        "SELECT key, value, cstruct FROM merge_struct_target ORDER BY key") { 
_ => }
+      checkAnswer(df, Row(0, 0, null) :: Row(100, 22, Row(42)) :: Nil)
+    }
+  }
+
+  testWithMinSparkVersion(
+    "merge with column mapping handles array-of-struct field metadata 
correctly",
+    "3.4") {
+    withTable("merge_arraystruct_source", "merge_arraystruct_target") {
+      spark.sql("""
+                  |CREATE TABLE merge_arraystruct_target(
+                  |  key INT NOT NULL,
+                  |  tags ARRAY<STRUCT<label: STRING, score: INT>>)
+                  |USING DELTA
+                  |TBLPROPERTIES (
+                  |  'delta.minReaderVersion' = '2',
+                  |  'delta.minWriterVersion' = '5',
+                  |  'delta.columnMapping.mode' = 'name')
+        """.stripMargin)
+      spark.sql("INSERT INTO merge_arraystruct_target VALUES (0, null)")
+      spark.sql(
+        "INSERT INTO merge_arraystruct_target VALUES " +
+          "(100, array(named_struct('label', 'a', 'score', 10)))")
+      spark.sql("CREATE TABLE merge_arraystruct_source (key INT NOT NULL) 
USING DELTA")
+      spark.sql("INSERT INTO merge_arraystruct_source VALUES (1)")
+      // MERGE that leaves the array-of-struct column unchanged via CaseWhen
+      spark.sql("""
+                  |MERGE INTO merge_arraystruct_target AS target
+                  |USING merge_arraystruct_source AS source
+                  |ON source.key = target.key
+                  |WHEN NOT MATCHED BY SOURCE AND target.key = 100 THEN
+                  |  UPDATE SET target.key = 101
+        """.stripMargin)
+      val df = runQueryAndCompare("SELECT key, tags FROM 
merge_arraystruct_target ORDER BY key") {
+        _ =>
+      }
+      checkAnswer(df, Row(0, null) :: Row(101, Seq(Row("a", 10))) :: Nil)
+    }
+  }
+
+  testWithMinSparkVersion(
+    "merge with column mapping handles map-of-struct field metadata correctly",
+    "3.4") {
+    withTable("merge_mapstruct_source", "merge_mapstruct_target") {
+      spark.sql("""
+                  |CREATE TABLE merge_mapstruct_target(
+                  |  key INT NOT NULL,
+                  |  props MAP<STRING, STRUCT<val: INT>>)
+                  |USING DELTA
+                  |TBLPROPERTIES (
+                  |  'delta.minReaderVersion' = '2',
+                  |  'delta.minWriterVersion' = '5',
+                  |  'delta.columnMapping.mode' = 'name')
+        """.stripMargin)
+      spark.sql("INSERT INTO merge_mapstruct_target VALUES (0, null)")
+      spark.sql(
+        "INSERT INTO merge_mapstruct_target VALUES " +
+          "(100, map('x', named_struct('val', 99)))")
+      spark.sql("CREATE TABLE merge_mapstruct_source (key INT NOT NULL) USING 
DELTA")
+      spark.sql("INSERT INTO merge_mapstruct_source VALUES (1)")
+      // MERGE that leaves the map-of-struct column unchanged via CaseWhen
+      spark.sql("""
+                  |MERGE INTO merge_mapstruct_target AS target
+                  |USING merge_mapstruct_source AS source
+                  |ON source.key = target.key
+                  |WHEN NOT MATCHED BY SOURCE AND target.key = 100 THEN
+                  |  UPDATE SET target.key = 101
+        """.stripMargin)
+      val df = runQueryAndCompare("SELECT key, props FROM 
merge_mapstruct_target ORDER BY key") {
+        _ =>
+      }
+      checkAnswer(df, Row(0, null) :: Row(101, Map("x" -> Row(99))) :: Nil)
+    }
+  }
+
+  testWithMinSparkVersion(
+    "merge with column mapping handles nested struct-within-struct field 
metadata correctly",
+    "3.4") {
+    withTable("merge_nestedstruct_source", "merge_nestedstruct_target") {
+      spark.sql("""
+                  |CREATE TABLE merge_nestedstruct_target(
+                  |  key INT NOT NULL,
+                  |  nested STRUCT<outer_val: STRUCT<inner_val: INT>>)
+                  |USING DELTA
+                  |TBLPROPERTIES (
+                  |  'delta.minReaderVersion' = '2',
+                  |  'delta.minWriterVersion' = '5',
+                  |  'delta.columnMapping.mode' = 'name')
+        """.stripMargin)
+      spark.sql("INSERT INTO merge_nestedstruct_target VALUES (0, null)")
+      spark.sql(
+        "INSERT INTO merge_nestedstruct_target VALUES " +
+          "(100, named_struct('outer_val', named_struct('inner_val', 42)))")
+      spark.sql("CREATE TABLE merge_nestedstruct_source (key INT NOT NULL) 
USING DELTA")
+      spark.sql("INSERT INTO merge_nestedstruct_source VALUES (1)")
+      spark.sql("""
+                  |MERGE INTO merge_nestedstruct_target AS target
+                  |USING merge_nestedstruct_source AS source
+                  |ON source.key = target.key
+                  |WHEN NOT MATCHED BY SOURCE AND target.key = 100 THEN
+                  |  UPDATE SET target.key = 101
+        """.stripMargin)
+      val df = runQueryAndCompare(
+        "SELECT key, nested FROM merge_nestedstruct_target ORDER BY key") { _ 
=> }
+      checkAnswer(df, Row(0, null) :: Row(101, Row(Row(42))) :: Nil)
+    }
+  }
+
+  testWithMinSparkVersion(
+    "merge with column mapping handles array with null struct elements 
correctly",
+    "3.4") {
+    withTable("merge_arraynull_source", "merge_arraynull_target") {
+      spark.sql("""
+                  |CREATE TABLE merge_arraynull_target(
+                  |  key INT NOT NULL,
+                  |  items ARRAY<STRUCT<name: STRING, qty: INT>>)
+                  |USING DELTA
+                  |TBLPROPERTIES (
+                  |  'delta.minReaderVersion' = '2',
+                  |  'delta.minWriterVersion' = '5',
+                  |  'delta.columnMapping.mode' = 'name')
+        """.stripMargin)
+      spark.sql("INSERT INTO merge_arraynull_target VALUES (0, null)")
+      spark.sql(
+        "INSERT INTO merge_arraynull_target VALUES " +
+          "(100, array(named_struct('name', 'a', 'qty', 1), null))")
+      spark.sql("CREATE TABLE merge_arraynull_source (key INT NOT NULL) USING 
DELTA")
+      spark.sql("INSERT INTO merge_arraynull_source VALUES (1)")
+      spark.sql("""
+                  |MERGE INTO merge_arraynull_target AS target
+                  |USING merge_arraynull_source AS source
+                  |ON source.key = target.key
+                  |WHEN NOT MATCHED BY SOURCE AND target.key = 100 THEN
+                  |  UPDATE SET target.key = 101
+        """.stripMargin)
+      val df = runQueryAndCompare("SELECT key, items FROM 
merge_arraynull_target ORDER BY key") {
+        _ =>
+      }
+      checkAnswer(df, Row(0, null) :: Row(101, Seq(Row("a", 1), null)) :: Nil)
+    }
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
index b15d32a6e1..91e3112c4a 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.ProjectExecTransformer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, CreateNamedStruct, 
NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CreateNamedStruct, 
Expression, If, NamedExpression}
 import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlan
@@ -56,11 +56,22 @@ object CollapseProjectExecTransformer extends 
Rule[SparkPlan] {
 
   /**
    * In Velox, CreateNamedStruct will generate a special output named obj, We 
cannot collapse such
-   * project transformer, otherwise it will result in a bind reference failure.
+   * project transformer, otherwise it will result in a bind reference 
failure. Checks for
+   * CreateNamedStruct as direct Alias child or wrapped in If (null-guard 
pattern from
+   * reconcileFieldNames), but not deeply nested inside arbitrary expressions.
    */
   private def containsNamedStructAlias(projectList: Seq[NamedExpression]): 
Boolean = {
     projectList.exists {
-      case a: Alias => a.child.exists(_.isInstanceOf[CreateNamedStruct])
+      case a: Alias => isOrWrapsCreateNamedStruct(a.child)
+      case _ => false
+    }
+  }
+
+  private def isOrWrapsCreateNamedStruct(expr: Expression): Boolean = {
+    expr match {
+      case _: CreateNamedStruct => true
+      case If(_, trueValue, falseValue) =>
+        isOrWrapsCreateNamedStruct(trueValue) || 
isOrWrapsCreateNamedStruct(falseValue)
       case _ => false
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to