[ 
https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16165148#comment-16165148
 ] 

Maryann Xue commented on SPARK-18591:
-------------------------------------

Hey, I came up with this idea of doing sort-aggregate on sorted input while 
debugging some performance issue, and it was before I saw this JIRA. I tried 
implementing it and found out that I couldn't do it in physical planning 
because it was top-down, so I ended up doing the same thing as 
https://github.com/maropu/spark/commit/32b716cf02dfe8cba5b08b2dc3297bc061156630#diff-7d06cf071190dcbeda2fed6b039ec5d0R55.
I totally agree with [~maropu] that we need to make the physical planning 
bottom-up and then we can solve this in a better way.
But aside from this, I found another issue with a slightly more sophisticated 
query example:
{{SELECT t1.a, count\(*\) FROM t1 JOIN t2 ON t1.a = t2.b GROUP BY t1.a}}
This would only work if we put the {{ReplaceSortAggregate}} rule after 
{{EnsureRequirements}} because SortMergeJoinExec's outputOrdering is not 
correct before EnsureRequirements happens. Please refer to SPARK-21998 for 
details.

> Replace hash-based aggregates with sort-based ones if inputs already sorted
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-18591
>                 URL: https://issues.apache.org/jira/browse/SPARK-18591
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.0.2
>            Reporter: Takeshi Yamamuro
>
> Spark currently uses sort-based aggregates only in limited condition; the 
> cases where spark cannot use partial aggregates and hash-based ones.
> However, if input ordering has already satisfied the requirements of 
> sort-based aggregates, it seems sort-based ones are faster than the other.
> {code}
> ./bin/spark-shell --conf spark.sql.shuffle.partitions=1
> val df = spark.range(10000000).selectExpr("id AS key", "id % 10 AS 
> value").sort($"key").cache
> def timer[R](block: => R): R = {
>   val t0 = System.nanoTime()
>   val result = block
>   val t1 = System.nanoTime()
>   println("Elapsed time: " + ((t1 - t0 + 0.0) / 1000000000.0)+ "s")
>   result
> }
> timer {
>   df.groupBy("key").count().count
> }
> // codegen'd hash aggregate
> Elapsed time: 7.116962977s
> // non-codegen'd sort aggregarte
> Elapsed time: 3.088816662s
> {code}
> If codegen'd sort-based aggregates are supported in SPARK-16844, this seems 
> to make the performance gap bigger;
> {code}
> - codegen'd sort aggregate
> Elapsed time: 1.645234684s
> {code} 
> Therefore, it'd be better to use sort-based ones in this case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to