Lukas Grasmann created SPARK-39022:
--------------------------------------

             Summary: Spark SQL - Combination of HAVING and SORT not resolved 
correctly
                 Key: SPARK-39022
                 URL: https://issues.apache.org/jira/browse/SPARK-39022
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.2.1, 3.1.2, 3.4
            Reporter: Lukas Grasmann
         Attachments: explain_new.txt, explain_old.txt

h1. Spark SQL - Combination of HAVING and SORT not resolved correctly

Example: Given a simple relation {{test}} with two relevant columns {{hotel}} 
and {{price}} where {{hotel}} is a unique identifier of a hotel and {{price}} 
is the cost of a night's stay. We would then like to order the {{{}hotel{}}}s 
by their cumulative prices but only for hotels where the cumulative price is 
higher than {{{}150{}}}.
h2. Current Behavior

To achieve the goal specified above, we give a simple query that works in most 
common database systems. Note that we only retrieve {{hotel}} in the {{SELECT 
... FROM}} statement which means that the aggregate has to be removed from the 
result attributes using a {{Project}} node.
{code:scala}
sqlcontext.sql("SELECT hotel FROM test GROUP BY hotel HAVING sum(price) > 150 
ORDER BY sum(price)").show{code}
Currently, this yields an {{AnalysisException}} since the aggregate 
{{sum(price)}} in {{Sort}} is not resolved correctly. Note that the child of 
{{Sort}} is a (premature) {{Project}} node which only provides {{hotel}} as its 
output. This prevents the aggregate values from being passed to {{{}Sort{}}}.
{code:scala}
org.apache.spark.sql.AnalysisException: Column 'price' does not exist. Did you 
mean one of the following? [test.hotel]; line 1 pos 75;
'Sort ['sum('price) ASC NULLS FIRST], true
+- Project [hotel#17]
   +- Filter (sum(cast(price#18 as double))#22 > cast(150 as double))
      +- Aggregate [HOTEL#17], [hotel#17, sum(cast(price#18 as double)) AS 
sum(cast(price#18 as double))#22]
         +- SubqueryAlias test
            +- View (`test`, [hotel#17,price#18])
               +- Relation [hotel#17,price#18] csv
{code}
The {{AnalysisException}} itself, however, is not caused by the introduced 
{{Project}} as can be seen in the following example. Here, {{sum(price)}} is 
part of the result and therefore *not* removed using a {{Project}} node.
{code:scala}
sqlcontext.sql("SELECT hotel, sum(price) FROM test GROUP BY hotel HAVING 
sum(price) > 150 ORDER BY sum(price)").show{code}
Resolving the aggregate {{sum(price)}} (i.e., resolving it to the aggregate 
introduced by the {{Aggregate}} node) is still not successful even if there is 
no {{{}Project{}}}. Spark still throws the following {{AnalysisException}} 
which is similar to the exception from before. It follows that there is a 
second error in the analyzer that still prevents successful resolution even if 
the problem regarding the {{Project}} node is fixed.
{code:scala}
org.apache.spark.sql.AnalysisException: Column 'price' does not exist. Did you 
mean one of the following? [sum(price), test.hotel]; line 1 pos 87;
'Sort ['sum('price) ASC NULLS FIRST], true
+- Filter (sum(price)#24 > cast(150 as double))
   +- Aggregate [HOTEL#17], [hotel#17, sum(cast(price#18 as double)) AS 
sum(price)#24]
      +- SubqueryAlias test
         +- View (`test`, [hotel#17,price#18])
            +- Relation [hotel#17,price#18] csv
{code}
 

This error occurs (at least) in Spark versions 3.1.2, 3.2.1, as well as the 
latest version from the GitHub {{master}} branch.
h2. Current Workaround

The issue can currently be worked around by using a subquery to first retrieve 
only the hotels which fulfill the condition and then ordering them in the outer 
query:
{code:sql}
SELECT hotel, sum_price FROM
    (SELECT hotel, sum(price) AS sum_price FROM test GROUP BY hotel HAVING 
sum(price) > 150) sub
ORDER BY sum_price;
{code}
h2. Proposed Solution(s)

The first change fixes the (premature) insertion of {{Project}} before a 
{{Sort}} by moving the {{Project}} up in the plan such that the {{Project}} is 
then parent of the {{Sort}} instead of vice versa. This does not change the 
results of the computations since both {{Sort}} and {{Project}} do not add or 
remove tuples from the result.

There are two potential side-effects to this solution:
 * May change some plans generated by DataFrame/DataSet which previously also 
produced similar errors such that they now yield a result instead. However, 
this is unlikely to produce unexpected/undesired results (see above).
 * Moving the projection might reduce performance for {{Sort}} since the input 
is potentially bigger.

{code:scala}
object PreventPrematureProjections extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
        case sort@Sort(_, _,
            project@Project(_,
                filter@Filter(_,
                    aggregate: Aggregate
                )
            )
        ) =>
        
        project.copy(
            child = sort.copy(
                child = filter.copy(
                    child = aggregate
                )
            )
        )
    }
}
{code}
 

To solve the second problem with aggregates not being resolved, we introduce a 
new case for {{ResolveAggregateFunctions}} as the second change. The newly 
introduced code is similar to the cases already in place. Here, we ensure that 
aggregate functions of the plan can also be resolved if there is a {{Filter}} 
(introduced by resolving an {{UnresolvedHaving}} node generated by parsing the 
original query) "between" the {{Sort}} and the {{Aggregate}} nodes.
{code:scala}
case Sort(sortOrder, global, filter@Filter(_, agg: Aggregate)) if agg.resolved 
=>
    val maybeResolved = 
sortOrder.map(_.child).map(resolveExpressionByPlanOutput(_, agg))
    resolveOperatorWithAggregate(maybeResolved, agg, (newExprs, newChild) => {
        val newSortOrder = sortOrder.zip(newExprs).map {
        case (sortOrder, expr) => sortOrder.copy(child = expr)
        }
        Sort(newSortOrder, global, filter.copy(child = newChild))
    })
{code}
h2. Changed Behavior

The behavior of one of the TCPDS v2.7 plan stability suite tests changed. For 
{{{}tpcds-v2.7.0/q6.sql{}}}, the resolved plan is now slightly different since 
the resolution has changed. This should, however, not affect the results of the 
query as it consists of only two attributes ({{{}state{}}} and {{{}cnt{}}}).

 

Old {{{}explain.txt{}}}:

 
{code:java}
(37) HashAggregate [codegen id : 8]
Input [2]: [ca_state#2, count#27]
Keys [1]: [ca_state#2]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#29]
Results [3]: [ca_state#2 AS state#30, count(1)#29 AS cnt#31, ca_state#2]

(38) Filter [codegen id : 8]
Input [3]: [state#30, cnt#31, ca_state#2]
Condition : (cnt#31 >= 10)

(39) TakeOrderedAndProject
Input [3]: [state#30, cnt#31, ca_state#2]
Arguments: 100, [cnt#31 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST], 
[state#30, cnt#31]{code}
 

New {{{}explain.txt{}}}:

 
{code:java}
(37) HashAggregate [codegen id : 8]
Input [2]: [ca_state#2, count#27]
Keys [1]: [ca_state#2]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#29]
Results [2]: [ca_state#2 AS state#30, count(1)#29 AS cnt#31]

(38) Filter [codegen id : 8]
Input [2]: [state#30, cnt#31]
Condition : (cnt#31 >= 10)

(39) TakeOrderedAndProject
Input [2]: [state#30, cnt#31]
Arguments: 100, [cnt#31 ASC NULLS FIRST, state#30 ASC NULLS FIRST], [state#30, 
cnt#31]{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to