gaoyajun02 created SPARK-36339:
----------------------------------

             Summary: aggsBuffer should collect AggregateExpression in the map 
range
                 Key: SPARK-36339
                 URL: https://issues.apache.org/jira/browse/SPARK-36339
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.1.2, 3.0.3, 2.4.8
            Reporter: gaoyajun02


show demo for this ISSUE:

This SQL 

This SQL can be executed normally

 
{code:java}
// SQL without error

SELECT name, count(name) c
FROM VALUES ('Alice'), ('Bob') people(name)
GROUP BY name GROUPING SETS(name);

// An error is reported after exchanging the order of the query columns:

SELECT count(name) c, name
FROM VALUES ('Alice'), ('Bob') people(name)
GROUP BY name GROUPING SETS(name);

{code}
The error message is:

 
{code:java}

Error in query: expression 'people.`name`' is neither present in the group by, 
nor is it an aggregate function. Add to group by or wrap in first() (or 
first_value) if you don't care which value you get.;;
Aggregate [name#5, spark_grouping_id#3], [count(name#1) AS c#0L, name#1]
+- Expand [List(name#1, name#4, 0)], [name#1, name#5, spark_grouping_id#3]
   +- Project [name#1, name#1 AS name#4]
      +- SubqueryAlias `people`
         +- LocalRelation [name#1]

{code}
So far, I have checked that there is no problem before version 2.3. 

 

During debugging, I found that the behavior of constructAggregateExprs in 
ResolveGroupingAnalytics has changed. 
{code:java}
    /*
     * Construct new aggregate expressions by replacing grouping functions.
     */
    private def constructAggregateExprs(
        groupByExprs: Seq[Expression],
        aggregations: Seq[NamedExpression],
        groupByAliases: Seq[Alias],
        groupingAttrs: Seq[Expression],
        gid: Attribute): Seq[NamedExpression] = aggregations.map {
      // collect all the found AggregateExpression, so we can check an 
expression is part of
      // any AggregateExpression or not.
      val aggsBuffer = ArrayBuffer[Expression]()
      // Returns whether the expression belongs to any expressions in 
`aggsBuffer` or not.
      def isPartOfAggregation(e: Expression): Boolean = {
        aggsBuffer.exists(a => a.find(_ eq e).isDefined)
      }
      replaceGroupingFunc(_, groupByExprs, gid).transformDown {
        // AggregateExpression should be computed on the unmodified value of 
its argument
        // expressions, so we should not replace any references to grouping 
expression
        // inside it.
        case e: AggregateExpression =>
          aggsBuffer += e
          e
        case e if isPartOfAggregation(e) => e
        case e =>
          // Replace expression by expand output attribute.
          val index = groupByAliases.indexWhere(_.child.semanticEquals(e))
          if (index == -1) {
            e
          } else {
            groupingAttrs(index)
          }
      }.asInstanceOf[NamedExpression]
    }

{code}
When performing aggregations.map, the aggsBuffer here seems to be outside the 
scope of the map. It can store the AggregateExpression of all the elements 
processed by the map function, but this is not before 2.3.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to