[ 
https://issues.apache.org/jira/browse/SPARK-39022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528071#comment-17528071
 ] 

Lukas Grasmann commented on SPARK-39022:
----------------------------------------

I am working on a Pull Request for this issue in case the solution described is 
ok.

> Spark SQL - Combination of HAVING and SORT not resolved correctly
> -----------------------------------------------------------------
>
>                 Key: SPARK-39022
>                 URL: https://issues.apache.org/jira/browse/SPARK-39022
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2, 3.2.1, 3.4
>            Reporter: Lukas Grasmann
>            Priority: Major
>         Attachments: explain_new.txt, explain_old.txt
>
>
> Example: Given a simple relation {{test}} with two relevant columns {{hotel}} 
> and {{price}} where {{hotel}} is a unique identifier of a hotel and {{price}} 
> is the cost of a night's stay. We would then like to order the {{{}hotel{}}}s 
> by their cumulative prices but only for hotels where the cumulative price is 
> higher than {{{}150{}}}.
> h2. Current Behavior
> To achieve the goal specified above, we give a simple query that works in 
> most common database systems. Note that we only retrieve {{hotel}} in the 
> {{SELECT ... FROM}} statement which means that the aggregate has to be 
> removed from the result attributes using a {{Project}} node.
> {code:scala}
> sqlcontext.sql("SELECT hotel FROM test GROUP BY hotel HAVING sum(price) > 150 
> ORDER BY sum(price)").show{code}
> Currently, this yields an {{AnalysisException}} since the aggregate 
> {{sum(price)}} in {{Sort}} is not resolved correctly. Note that the child of 
> {{Sort}} is a (premature) {{Project}} node which only provides {{hotel}} as 
> its output. This prevents the aggregate values from being passed to 
> {{{}Sort{}}}.
> {code:scala}
> org.apache.spark.sql.AnalysisException: Column 'price' does not exist. Did 
> you mean one of the following? [test.hotel]; line 1 pos 75;
> 'Sort ['sum('price) ASC NULLS FIRST], true
> +- Project [hotel#17]
>    +- Filter (sum(cast(price#18 as double))#22 > cast(150 as double))
>       +- Aggregate [HOTEL#17], [hotel#17, sum(cast(price#18 as double)) AS 
> sum(cast(price#18 as double))#22]
>          +- SubqueryAlias test
>             +- View (`test`, [hotel#17,price#18])
>                +- Relation [hotel#17,price#18] csv
> {code}
> The {{AnalysisException}} itself, however, is not caused by the introduced 
> {{Project}} as can be seen in the following example. Here, {{sum(price)}} is 
> part of the result and therefore *not* removed using a {{Project}} node.
> {code:scala}
> sqlcontext.sql("SELECT hotel, sum(price) FROM test GROUP BY hotel HAVING 
> sum(price) > 150 ORDER BY sum(price)").show{code}
> Resolving the aggregate {{sum(price)}} (i.e., resolving it to the aggregate 
> introduced by the {{Aggregate}} node) is still not successful even if there 
> is no {{{}Project{}}}. Spark still throws the following {{AnalysisException}} 
> which is similar to the exception from before. It follows that there is a 
> second error in the analyzer that still prevents successful resolution even 
> if the problem regarding the {{Project}} node is fixed.
> {code:scala}
> org.apache.spark.sql.AnalysisException: Column 'price' does not exist. Did 
> you mean one of the following? [sum(price), test.hotel]; line 1 pos 87;
> 'Sort ['sum('price) ASC NULLS FIRST], true
> +- Filter (sum(price)#24 > cast(150 as double))
>    +- Aggregate [HOTEL#17], [hotel#17, sum(cast(price#18 as double)) AS 
> sum(price)#24]
>       +- SubqueryAlias test
>          +- View (`test`, [hotel#17,price#18])
>             +- Relation [hotel#17,price#18] csv
> {code}
>  
> This error occurs (at least) in Spark versions 3.1.2, 3.2.1, as well as the 
> latest version from the GitHub {{master}} branch.
> h2. Current Workaround
> The issue can currently be worked around by using a subquery to first 
> retrieve only the hotels which fulfill the condition and then ordering them 
> in the outer query:
> {code:sql}
> SELECT hotel, sum_price FROM
>     (SELECT hotel, sum(price) AS sum_price FROM test GROUP BY hotel HAVING 
> sum(price) > 150) sub
> ORDER BY sum_price;
> {code}
> h2. Proposed Solution(s)
> The first change fixes the (premature) insertion of {{Project}} before a 
> {{Sort}} by moving the {{Project}} up in the plan such that the {{Project}} 
> is then parent of the {{Sort}} instead of vice versa. This does not change 
> the results of the computations since both {{Sort}} and {{Project}} do not 
> add or remove tuples from the result.
> There are two potential side-effects to this solution:
>  * May change some plans generated by DataFrame/DataSet which previously also 
> produced similar errors such that they now yield a result instead. However, 
> this is unlikely to produce unexpected/undesired results (see above).
>  * Moving the projection might reduce performance for {{Sort}} since the 
> input is potentially bigger.
> {code:scala}
> object PreventPrematureProjections extends Rule[LogicalPlan] {
>     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
>         case sort@Sort(_, _,
>             project@Project(_,
>                 filter@Filter(_,
>                     aggregate: Aggregate
>                 )
>             )
>         ) =>
>         
>         project.copy(
>             child = sort.copy(
>                 child = filter.copy(
>                     child = aggregate
>                 )
>             )
>         )
>     }
> }
> {code}
>  
> To solve the second problem with aggregates not being resolved, we introduce 
> a new case for {{ResolveAggregateFunctions}} as the second change. The newly 
> introduced code is similar to the cases already in place. Here, we ensure 
> that aggregate functions of the plan can also be resolved if there is a 
> {{Filter}} (introduced by resolving an {{UnresolvedHaving}} node generated by 
> parsing the original query) "between" the {{Sort}} and the {{Aggregate}} 
> nodes.
> {code:scala}
> case Sort(sortOrder, global, filter@Filter(_, agg: Aggregate)) if 
> agg.resolved =>
>     val maybeResolved = 
> sortOrder.map(_.child).map(resolveExpressionByPlanOutput(_, agg))
>     resolveOperatorWithAggregate(maybeResolved, agg, (newExprs, newChild) => {
>         val newSortOrder = sortOrder.zip(newExprs).map {
>         case (sortOrder, expr) => sortOrder.copy(child = expr)
>         }
>         Sort(newSortOrder, global, filter.copy(child = newChild))
>     })
> {code}
> h2. Changed Behavior
> The behavior of one of the TCPDS v2.7 plan stability suite tests changed. For 
> {{{}tpcds-v2.7.0/q6.sql{}}}, the resolved plan is now slightly different 
> since the resolution has changed. This should, however, not affect the 
> results of the query as it consists of only two attributes ({{{}state{}}} and 
> {{{}cnt{}}}).
>  
> Old {{{}explain.txt{}}}:
>  
> {code:java}
> (37) HashAggregate [codegen id : 8]
> Input [2]: [ca_state#2, count#27]
> Keys [1]: [ca_state#2]
> Functions [1]: [count(1)]
> Aggregate Attributes [1]: [count(1)#29]
> Results [3]: [ca_state#2 AS state#30, count(1)#29 AS cnt#31, ca_state#2]
> (38) Filter [codegen id : 8]
> Input [3]: [state#30, cnt#31, ca_state#2]
> Condition : (cnt#31 >= 10)
> (39) TakeOrderedAndProject
> Input [3]: [state#30, cnt#31, ca_state#2]
> Arguments: 100, [cnt#31 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST], 
> [state#30, cnt#31]{code}
>  
> New {{{}explain.txt{}}}:
>  
> {code:java}
> (37) HashAggregate [codegen id : 8]
> Input [2]: [ca_state#2, count#27]
> Keys [1]: [ca_state#2]
> Functions [1]: [count(1)]
> Aggregate Attributes [1]: [count(1)#29]
> Results [2]: [ca_state#2 AS state#30, count(1)#29 AS cnt#31]
> (38) Filter [codegen id : 8]
> Input [2]: [state#30, cnt#31]
> Condition : (cnt#31 >= 10)
> (39) TakeOrderedAndProject
> Input [2]: [state#30, cnt#31]
> Arguments: 100, [cnt#31 ASC NULLS FIRST, state#30 ASC NULLS FIRST], 
> [state#30, cnt#31]{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to