[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16165148#comment-16165148 ]
Maryann Xue commented on SPARK-18591: ------------------------------------- Hey, I came up with this idea of doing sort-aggregate on sorted input while debugging some performance issue, and it was before I saw this JIRA. I tried implementing it and found out that I couldn't do it in physical planning because it was top-down, so I ended up doing the same thing as https://github.com/maropu/spark/commit/32b716cf02dfe8cba5b08b2dc3297bc061156630#diff-7d06cf071190dcbeda2fed6b039ec5d0R55. I totally agree with [~maropu] that we need to make the physical planning bottom-up and then we can solve this in a better way. But aside from this, I found another issue with a slightly more sophisticated query example: {{SELECT t1.a, count\(*\) FROM t1 JOIN t2 ON t1.a = t2.b GROUP BY t1.a}} This would only work if we put the {{ReplaceSortAggregate}} rule after {{EnsureRequirements}} because SortMergeJoinExec's outputOrdering is not correct before EnsureRequirements happens. Please refer to SPARK-21998 for details. > Replace hash-based aggregates with sort-based ones if inputs already sorted > --------------------------------------------------------------------------- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.0.2 > Reporter: Takeshi Yamamuro > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(10000000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 1000000000.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org