cloud-fan commented on a change in pull request #32301:
URL: https://github.com/apache/spark/pull/32301#discussion_r622729279



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -17,71 +17,148 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
- * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
- * If a project or its child references to nested fields, and not all the 
fields
- * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
- * of the nested fields as aliases on the children of the child will be 
created.
+ * This aims to handle a nested column aliasing pattern inside the 
[[ColumnPruning]] optimizer rule.
+ * If:
+ * - A [[Project]] or its child references nested fields
+ * - Not all of the fields in a nested attribute are used
+ * Then:
+ * - Substitute the nested field references with alias attributes
+ * - Add grandchild [[Project]]s transforming the nested fields to aliases
+ *
+ * Example 1: Project
+ * ------------------
+ * Before:
+ * +- Project [concat_ws(s#0.a, s#0.b) AS concat_ws(s.a, s.b)#1]
+ *   +- GlobalLimit 5
+ *     +- LocalLimit 5
+ *       +- LocalRelation <empty>, [s#0]
+ * After:
+ * +- Project [concat_ws(_gen_alias_2#2, _gen_alias_3#3) AS concat_ws(s.a, 
s.b)#1]
+ *   +- GlobalLimit 5
+ *     +- LocalLimit 5
+ *       +- Project [s#0.a AS _gen_alias_2#2, s#0.b AS _gen_alias_3#3]
+ *         +- LocalRelation <empty>, [s#0]
+ *
+ * Example 2: Project above Filter
+ * -------------------------------
+ * Before:
+ * +- Project [s#0.a AS s.a#1]
+ *   +- Filter (length(s#0.b) > 2)
+ *     +- GlobalLimit 5
+ *       +- LocalLimit 5
+ *         +- LocalRelation <empty>, [s#0]
+ * After:
+ * +- Project [_gen_alias_2#2 AS s.a#1]
+ *   +- Filter (length(_gen_alias_3#3) > 2)
+ *     +- GlobalLimit 5
+ *       +- LocalLimit 5
+ *         +- Project [s#0.a AS _gen_alias_2#2, s#0.b AS _gen_alias_3#3]
+ *           +- LocalRelation <empty>, [s#0]
+ *
+ * Example 3: Nested columns in nested columns
+ * -------------------------------------------
+ * Before:
+ * +- Project [s#0.a AS s.a#1, s#0.a.a1 AS s.a.a1#2]
+ *   +- GlobalLimit 5
+ *     +- LocalLimit 5
+ *       +- LocalRelation <empty>, [s#0]
+ * After:
+ * +- Project [_gen_alias_3#3 AS s.a#1, _gen_alias_3#3.name AS s.a.a1#2]
+ *   +- GlobalLimit 5
+ *     +- LocalLimit 5
+ *       +- Project [s#0.a AS _gen_alias_3#3]
+ *         +- LocalRelation <empty>, [s#0]
+ *
+ * The schema of the datasource relation will be pruned in the 
[[SchemaPruning]] optimizer rule.
  */
 object NestedColumnAliasing {
 
   def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
     /**
      * This pattern is needed to support [[Filter]] plan cases like
-     * [[Project]]->[[Filter]]->listed plan in `canProjectPushThrough` (e.g., 
[[Window]]).
-     * The reason why we don't simply add [[Filter]] in 
`canProjectPushThrough` is that
+     * [[Project]]->[[Filter]]->listed plan in [[canProjectPushThrough]] 
(e.g., [[Window]]).
+     * The reason why we don't simply add [[Filter]] in 
[[canProjectPushThrough]] is that
      * the optimizer can hit an infinite loop during the 
[[PushDownPredicates]] rule.
      */
-    case Project(projectList, Filter(condition, child))
-        if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
-      val exprCandidatesToPrune = projectList ++ Seq(condition) ++ 
child.expressions
-      getAliasSubMap(exprCandidatesToPrune, 
child.producedAttributes.toSeq).map {
-        case (nestedFieldToAlias, attrToAliases) =>
-          NestedColumnAliasing.replaceToAliases(plan, nestedFieldToAlias, 
attrToAliases)
-      }
+    case Project(projectList, Filter(condition, child)) if
+        SQLConf.get.nestedSchemaPruningEnabled && canProjectPushThrough(child) 
=>
+      rewritePlanIfSubsetFieldsUsed(
+        plan, projectList ++ Seq(condition) ++ child.expressions, 
child.producedAttributes.toSeq)
 
-    case Project(projectList, child)
-        if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
-      val exprCandidatesToPrune = projectList ++ child.expressions
-      getAliasSubMap(exprCandidatesToPrune, 
child.producedAttributes.toSeq).map {
-        case (nestedFieldToAlias, attrToAliases) =>
-          NestedColumnAliasing.replaceToAliases(plan, nestedFieldToAlias, 
attrToAliases)
-      }
+    case Project(projectList, child) if
+        SQLConf.get.nestedSchemaPruningEnabled && canProjectPushThrough(child) 
=>
+      rewritePlanIfSubsetFieldsUsed(
+        plan, projectList ++ child.expressions, child.producedAttributes.toSeq)
 
     case p if SQLConf.get.nestedSchemaPruningEnabled && canPruneOn(p) =>
-      val exprCandidatesToPrune = p.expressions
-      getAliasSubMap(exprCandidatesToPrune, p.producedAttributes.toSeq).map {
-        case (nestedFieldToAlias, attrToAliases) =>
-          NestedColumnAliasing.replaceToAliases(p, nestedFieldToAlias, 
attrToAliases)
-      }
+      rewritePlanIfSubsetFieldsUsed(
+        plan, p.expressions, p.producedAttributes.toSeq)
 
     case _ => None
   }
 
+  /**
+   * Rewrites a plan with aliases if only a subset of the nested fields are 
used.
+   */
+  def rewritePlanIfSubsetFieldsUsed(
+      plan: LogicalPlan,
+      exprList: Seq[Expression],
+      exclusiveAttrs: Seq[Attribute]): Option[LogicalPlan] = {
+    val attrToExtractValues = getAttributeToExtractValues(exprList, 
exclusiveAttrs)
+    if (attrToExtractValues.isEmpty) {
+      None
+    } else {
+      Some(rewritePlanWithAliases(plan, attrToExtractValues))
+    }
+  }
+
   /**
    * Replace nested columns to prune unused nested columns later.
    */
-  private def replaceToAliases(
+  def rewritePlanWithAliases(
       plan: LogicalPlan,
-      nestedFieldToAlias: Map[ExtractValue, Alias],
-      attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
-    case Project(projectList, child) =>
-      Project(
-        getNewProjectList(projectList, nestedFieldToAlias),
-        replaceWithAliases(child, nestedFieldToAlias, attrToAliases))
-
-    // The operators reaching here was already guarded by `canPruneOn`.
-    case other =>
-      replaceWithAliases(other, nestedFieldToAlias, attrToAliases)
+      attributeToExtractValues: Map[Attribute, Seq[ExtractValue]]): 
LogicalPlan = {
+    // Each expression can contain multiple nested fields.
+    // Note that we keep the original names to deliver to parquet in a 
case-sensitive way.
+    // A new alias is created for each nested field.
+    val nestedFieldToAlias = attributeToExtractValues.flatMap { case (_, 
nestedFields) =>
+      nestedFields.map { f =>
+        val exprId = NamedExpression.newExprId
+        val fieldName = f match {
+          case g: GetStructField => g.extractFieldName
+          case g: GetArrayStructFields => g.field.name
+        }
+        f -> Alias(f, s"_extract_${fieldName}")(exprId, Seq.empty, None)
+      }
+    }
+
+    // A reference attribute can have multiple aliases for nested fields.
+    val attrToAliases = attributeToExtractValues.map { case (attr, 
nestedFields) =>
+      attr.exprId -> nestedFields.map(nestedFieldToAlias)

Review comment:
       this map lookup looks fragile as it use java `equals` not semantic 
equals. how about
   ```
   val attributeToAliasesExtractValues = 
AttributeMap(attributeToExtractValues.map ...)
   
   val nestedFieldToAlias = attributeToAliasesExtractValues.flatMap...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to