viirya commented on a change in pull request #32470:
URL: https://github.com/apache/spark/pull/32470#discussion_r650251603



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
##########
@@ -652,3 +652,15 @@ case object UnresolvedSeed extends LeafExpression with 
Unevaluable {
   override def dataType: DataType = throw new UnresolvedException("dataType")
   override lazy val resolved = false
 }
+
+/**
+ * An intermediate expression to hold a resolved (nested) column. Some rules 
may need to undo the
+ * column resolution and use this expression to keep the original column name.
+ */
+case class TempResolvedColumn(child: Expression, nameParts: Seq[String]) 
extends UnaryExpression

Review comment:
       Is its child always a named expression?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2457,164 +2451,133 @@ class Analyzer(override val catalogManager: 
CatalogManager)
       _.containsPattern(AGGREGATE), ruleId) {
       // Resolve aggregate with having clause to Filter(..., Aggregate()). 
Note, to avoid wrongly
       // resolve the having condition expression, here we skip resolving it in 
ResolveReferences
-      // and transform it to Filter after aggregate is resolved. See more 
details in SPARK-31519.
+      // and transform it to Filter after aggregate is resolved. Basically 
columns in HAVING should
+      // be resolved with `agg.child.output` first. See more details in 
SPARK-31519.
       case UnresolvedHaving(cond, agg: Aggregate) if agg.resolved =>
-        resolveHaving(Filter(cond, agg), agg)
-
-      case f @ Filter(_, agg: Aggregate) if agg.resolved =>
-        resolveHaving(f, agg)
-
-      case sort @ Sort(sortOrder, global, aggregate: Aggregate) if 
aggregate.resolved =>
-
-        // Try resolving the ordering as though it is in the aggregate clause.
-        try {
-          // If a sort order is unresolved, containing references not in 
aggregate, or containing
-          // `AggregateExpression`, we need to push down it to the underlying 
aggregate operator.
-          val unresolvedSortOrders = sortOrder.filter { s =>
-            !s.resolved || !s.references.subsetOf(aggregate.outputSet) || 
containsAggregate(s)
-          }
-          val aliasedOrdering = unresolvedSortOrders.map(o => Alias(o.child, 
"aggOrder")())
-
-          val aggregateWithExtraOrdering = aggregate.copy(
-            aggregateExpressions = aggregate.aggregateExpressions ++ 
aliasedOrdering)
-
-          val resolvedAggregate: Aggregate =
-            
executeSameContext(aggregateWithExtraOrdering).asInstanceOf[Aggregate]
-
-          val (reResolvedAggExprs, resolvedAliasedOrdering) =
-            
resolvedAggregate.aggregateExpressions.splitAt(aggregate.aggregateExpressions.length)
-
-          // If we pass the analysis check, then the ordering expressions 
should only reference to
-          // aggregate expressions or grouping expressions, and it's safe to 
push them down to
-          // Aggregate.
-          checkAnalysis(resolvedAggregate)
-
-          val originalAggExprs = 
aggregate.aggregateExpressions.map(trimNonTopLevelAliases)
-
-          // If the ordering expression is same with original aggregate 
expression, we don't need
-          // to push down this ordering expression and can reference the 
original aggregate
-          // expression instead.
-          val needsPushDown = ArrayBuffer.empty[NamedExpression]
-          val orderToAlias = unresolvedSortOrders.zip(aliasedOrdering)
-          val evaluatedOrderings =
-            
resolvedAliasedOrdering.asInstanceOf[Seq[Alias]].zip(orderToAlias).map {
-              case (evaluated, (order, aliasOrder)) =>
-                val index = reResolvedAggExprs.indexWhere {
-                  case Alias(child, _) => child semanticEquals evaluated.child
-                  case other => other semanticEquals evaluated.child
-                }
-
-                if (index == -1) {
-                  if (hasCharVarchar(evaluated)) {
-                    needsPushDown += aliasOrder
-                    order.copy(child = aliasOrder)
-                  } else {
-                    needsPushDown += evaluated
-                    order.copy(child = evaluated.toAttribute)
-                  }
-                } else {
-                  order.copy(child = originalAggExprs(index).toAttribute)
-                }
+        resolveOperatorWithAggregate(Seq(cond), agg, (newExprs, newChild) => {
+          Filter(newExprs.head, newChild)
+        })
+
+      case Filter(cond, agg: Aggregate) if agg.resolved =>
+        // We should resolve the references normally based on child.output 
first.
+        val maybeResolved = resolveExpressionByPlanOutput(cond, agg)

Review comment:
       > We should resolve the references normally based on child.output first.
   
   But you resolve them using `agg`?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2457,164 +2451,133 @@ class Analyzer(override val catalogManager: 
CatalogManager)
       _.containsPattern(AGGREGATE), ruleId) {
       // Resolve aggregate with having clause to Filter(..., Aggregate()). 
Note, to avoid wrongly
       // resolve the having condition expression, here we skip resolving it in 
ResolveReferences
-      // and transform it to Filter after aggregate is resolved. See more 
details in SPARK-31519.
+      // and transform it to Filter after aggregate is resolved. Basically 
columns in HAVING should
+      // be resolved with `agg.child.output` first. See more details in 
SPARK-31519.
       case UnresolvedHaving(cond, agg: Aggregate) if agg.resolved =>
-        resolveHaving(Filter(cond, agg), agg)
-
-      case f @ Filter(_, agg: Aggregate) if agg.resolved =>
-        resolveHaving(f, agg)
-
-      case sort @ Sort(sortOrder, global, aggregate: Aggregate) if 
aggregate.resolved =>
-
-        // Try resolving the ordering as though it is in the aggregate clause.
-        try {
-          // If a sort order is unresolved, containing references not in 
aggregate, or containing
-          // `AggregateExpression`, we need to push down it to the underlying 
aggregate operator.
-          val unresolvedSortOrders = sortOrder.filter { s =>
-            !s.resolved || !s.references.subsetOf(aggregate.outputSet) || 
containsAggregate(s)
-          }
-          val aliasedOrdering = unresolvedSortOrders.map(o => Alias(o.child, 
"aggOrder")())
-
-          val aggregateWithExtraOrdering = aggregate.copy(
-            aggregateExpressions = aggregate.aggregateExpressions ++ 
aliasedOrdering)
-
-          val resolvedAggregate: Aggregate =
-            
executeSameContext(aggregateWithExtraOrdering).asInstanceOf[Aggregate]
-
-          val (reResolvedAggExprs, resolvedAliasedOrdering) =
-            
resolvedAggregate.aggregateExpressions.splitAt(aggregate.aggregateExpressions.length)
-
-          // If we pass the analysis check, then the ordering expressions 
should only reference to
-          // aggregate expressions or grouping expressions, and it's safe to 
push them down to
-          // Aggregate.
-          checkAnalysis(resolvedAggregate)
-
-          val originalAggExprs = 
aggregate.aggregateExpressions.map(trimNonTopLevelAliases)
-
-          // If the ordering expression is same with original aggregate 
expression, we don't need
-          // to push down this ordering expression and can reference the 
original aggregate
-          // expression instead.
-          val needsPushDown = ArrayBuffer.empty[NamedExpression]
-          val orderToAlias = unresolvedSortOrders.zip(aliasedOrdering)
-          val evaluatedOrderings =
-            
resolvedAliasedOrdering.asInstanceOf[Seq[Alias]].zip(orderToAlias).map {
-              case (evaluated, (order, aliasOrder)) =>
-                val index = reResolvedAggExprs.indexWhere {
-                  case Alias(child, _) => child semanticEquals evaluated.child
-                  case other => other semanticEquals evaluated.child
-                }
-
-                if (index == -1) {
-                  if (hasCharVarchar(evaluated)) {
-                    needsPushDown += aliasOrder
-                    order.copy(child = aliasOrder)
-                  } else {
-                    needsPushDown += evaluated
-                    order.copy(child = evaluated.toAttribute)
-                  }
-                } else {
-                  order.copy(child = originalAggExprs(index).toAttribute)
-                }
+        resolveOperatorWithAggregate(Seq(cond), agg, (newExprs, newChild) => {
+          Filter(newExprs.head, newChild)
+        })
+
+      case Filter(cond, agg: Aggregate) if agg.resolved =>
+        // We should resolve the references normally based on child.output 
first.
+        val maybeResolved = resolveExpressionByPlanOutput(cond, agg)

Review comment:
       Do you mean to call `resolveExprsWithAggregate` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to