[ https://issues.apache.org/jira/browse/SPARK-39022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528071#comment-17528071 ]
Lukas Grasmann commented on SPARK-39022: ---------------------------------------- I am working on a Pull Request for this issue in case the solution described is ok. > Spark SQL - Combination of HAVING and SORT not resolved correctly > ----------------------------------------------------------------- > > Key: SPARK-39022 > URL: https://issues.apache.org/jira/browse/SPARK-39022 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.2, 3.2.1, 3.4 > Reporter: Lukas Grasmann > Priority: Major > Attachments: explain_new.txt, explain_old.txt > > > Example: Given a simple relation {{test}} with two relevant columns {{hotel}} > and {{price}} where {{hotel}} is a unique identifier of a hotel and {{price}} > is the cost of a night's stay. We would then like to order the {{{}hotel{}}}s > by their cumulative prices but only for hotels where the cumulative price is > higher than {{{}150{}}}. > h2. Current Behavior > To achieve the goal specified above, we give a simple query that works in > most common database systems. Note that we only retrieve {{hotel}} in the > {{SELECT ... FROM}} statement which means that the aggregate has to be > removed from the result attributes using a {{Project}} node. > {code:scala} > sqlcontext.sql("SELECT hotel FROM test GROUP BY hotel HAVING sum(price) > 150 > ORDER BY sum(price)").show{code} > Currently, this yields an {{AnalysisException}} since the aggregate > {{sum(price)}} in {{Sort}} is not resolved correctly. Note that the child of > {{Sort}} is a (premature) {{Project}} node which only provides {{hotel}} as > its output. This prevents the aggregate values from being passed to > {{{}Sort{}}}. > {code:scala} > org.apache.spark.sql.AnalysisException: Column 'price' does not exist. Did > you mean one of the following? [test.hotel]; line 1 pos 75; > 'Sort ['sum('price) ASC NULLS FIRST], true > +- Project [hotel#17] > +- Filter (sum(cast(price#18 as double))#22 > cast(150 as double)) > +- Aggregate [HOTEL#17], [hotel#17, sum(cast(price#18 as double)) AS > sum(cast(price#18 as double))#22] > +- SubqueryAlias test > +- View (`test`, [hotel#17,price#18]) > +- Relation [hotel#17,price#18] csv > {code} > The {{AnalysisException}} itself, however, is not caused by the introduced > {{Project}} as can be seen in the following example. Here, {{sum(price)}} is > part of the result and therefore *not* removed using a {{Project}} node. > {code:scala} > sqlcontext.sql("SELECT hotel, sum(price) FROM test GROUP BY hotel HAVING > sum(price) > 150 ORDER BY sum(price)").show{code} > Resolving the aggregate {{sum(price)}} (i.e., resolving it to the aggregate > introduced by the {{Aggregate}} node) is still not successful even if there > is no {{{}Project{}}}. Spark still throws the following {{AnalysisException}} > which is similar to the exception from before. It follows that there is a > second error in the analyzer that still prevents successful resolution even > if the problem regarding the {{Project}} node is fixed. > {code:scala} > org.apache.spark.sql.AnalysisException: Column 'price' does not exist. Did > you mean one of the following? [sum(price), test.hotel]; line 1 pos 87; > 'Sort ['sum('price) ASC NULLS FIRST], true > +- Filter (sum(price)#24 > cast(150 as double)) > +- Aggregate [HOTEL#17], [hotel#17, sum(cast(price#18 as double)) AS > sum(price)#24] > +- SubqueryAlias test > +- View (`test`, [hotel#17,price#18]) > +- Relation [hotel#17,price#18] csv > {code} > > This error occurs (at least) in Spark versions 3.1.2, 3.2.1, as well as the > latest version from the GitHub {{master}} branch. > h2. Current Workaround > The issue can currently be worked around by using a subquery to first > retrieve only the hotels which fulfill the condition and then ordering them > in the outer query: > {code:sql} > SELECT hotel, sum_price FROM > (SELECT hotel, sum(price) AS sum_price FROM test GROUP BY hotel HAVING > sum(price) > 150) sub > ORDER BY sum_price; > {code} > h2. Proposed Solution(s) > The first change fixes the (premature) insertion of {{Project}} before a > {{Sort}} by moving the {{Project}} up in the plan such that the {{Project}} > is then parent of the {{Sort}} instead of vice versa. This does not change > the results of the computations since both {{Sort}} and {{Project}} do not > add or remove tuples from the result. > There are two potential side-effects to this solution: > * May change some plans generated by DataFrame/DataSet which previously also > produced similar errors such that they now yield a result instead. However, > this is unlikely to produce unexpected/undesired results (see above). > * Moving the projection might reduce performance for {{Sort}} since the > input is potentially bigger. > {code:scala} > object PreventPrematureProjections extends Rule[LogicalPlan] { > def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { > case sort@Sort(_, _, > project@Project(_, > filter@Filter(_, > aggregate: Aggregate > ) > ) > ) => > > project.copy( > child = sort.copy( > child = filter.copy( > child = aggregate > ) > ) > ) > } > } > {code} > > To solve the second problem with aggregates not being resolved, we introduce > a new case for {{ResolveAggregateFunctions}} as the second change. The newly > introduced code is similar to the cases already in place. Here, we ensure > that aggregate functions of the plan can also be resolved if there is a > {{Filter}} (introduced by resolving an {{UnresolvedHaving}} node generated by > parsing the original query) "between" the {{Sort}} and the {{Aggregate}} > nodes. > {code:scala} > case Sort(sortOrder, global, filter@Filter(_, agg: Aggregate)) if > agg.resolved => > val maybeResolved = > sortOrder.map(_.child).map(resolveExpressionByPlanOutput(_, agg)) > resolveOperatorWithAggregate(maybeResolved, agg, (newExprs, newChild) => { > val newSortOrder = sortOrder.zip(newExprs).map { > case (sortOrder, expr) => sortOrder.copy(child = expr) > } > Sort(newSortOrder, global, filter.copy(child = newChild)) > }) > {code} > h2. Changed Behavior > The behavior of one of the TCPDS v2.7 plan stability suite tests changed. For > {{{}tpcds-v2.7.0/q6.sql{}}}, the resolved plan is now slightly different > since the resolution has changed. This should, however, not affect the > results of the query as it consists of only two attributes ({{{}state{}}} and > {{{}cnt{}}}). > > Old {{{}explain.txt{}}}: > > {code:java} > (37) HashAggregate [codegen id : 8] > Input [2]: [ca_state#2, count#27] > Keys [1]: [ca_state#2] > Functions [1]: [count(1)] > Aggregate Attributes [1]: [count(1)#29] > Results [3]: [ca_state#2 AS state#30, count(1)#29 AS cnt#31, ca_state#2] > (38) Filter [codegen id : 8] > Input [3]: [state#30, cnt#31, ca_state#2] > Condition : (cnt#31 >= 10) > (39) TakeOrderedAndProject > Input [3]: [state#30, cnt#31, ca_state#2] > Arguments: 100, [cnt#31 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST], > [state#30, cnt#31]{code} > > New {{{}explain.txt{}}}: > > {code:java} > (37) HashAggregate [codegen id : 8] > Input [2]: [ca_state#2, count#27] > Keys [1]: [ca_state#2] > Functions [1]: [count(1)] > Aggregate Attributes [1]: [count(1)#29] > Results [2]: [ca_state#2 AS state#30, count(1)#29 AS cnt#31] > (38) Filter [codegen id : 8] > Input [2]: [state#30, cnt#31] > Condition : (cnt#31 >= 10) > (39) TakeOrderedAndProject > Input [2]: [state#30, cnt#31] > Arguments: 100, [cnt#31 ASC NULLS FIRST, state#30 ASC NULLS FIRST], > [state#30, cnt#31]{code} -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org