Hi Takeshi, Thanks for the answer. My UDAF aggregates data into an array of rows.
Apparently this makes it ineligible to using Hash-based aggregate based on the logic at: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108 The list of support data type is VERY limited unfortunately. It doesn't make sense to me that data type must be mutable for the UDAF to use hash-based aggregate, but I could be missing something here :). I could achieve hash-based aggregate by turning this query to RDD mode, but that is counter intuitive IMO. ------- Regards, Andy On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro <linguin....@gmail.com> wrote: > Hi, > > Spark always uses hash-based aggregates if the types of aggregated data > are supported there; > otherwise, spark fails to use hash-based ones, then it uses sort-based > ones. > See: https://github.com/apache/spark/blob/master/sql/ > core/src/main/scala/org/apache/spark/sql/execution/ > aggregate/AggUtils.scala#L38 > > So, I'm not sure about your query though, it seems the types of aggregated > data in your query > are not supported for hash-based aggregates. > > // maropu > > > > On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang <nam...@gmail.com> wrote: > >> Hi all, >> >> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort, >> which is very inefficient for certain aggration: >> >> The code is very simple: >> - I have a UDAF >> - What I want to do is: dataset.groupBy(cols).agg(udaf).count() >> >> The physical plan I got was: >> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L]) >> +- Exchange SinglePartition >> +- *HashAggregate(keys=[], functions=[partial_count(1)], >> output=[count#71L]) >> +- *Project >> +- Generate explode(internal_col#31), false, false, >> [internal_col#42] >> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0, >> nested#1, nestedArray#2, nestedObjectArray#3, value#4L, >> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31]) >> +- *Sort [key#0 ASC], false, 0 >> +- Exchange hashpartitioning(key#0, 200) >> +- SortAggregate(key=[key#0], >> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2, >> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)], >> output=[key#0,internal_col#37]) >> +- *Sort [key#0 ASC], false, 0 >> +- Scan ExistingRDD[key#0,nested#1,nes >> tedArray#2,nestedObjectArray#3,value#4L] >> >> How can I make Spark to use HashAggregate (like the count(*) expression) >> instead of SortAggregate with my UDAF? >> >> Is it intentional? Is there an issue tracking this? >> >> ------- >> Regards, >> Andy >> > > > > -- > --- > Takeshi Yamamuro >