Hi all, It appears to me that Dataset.groupBy().agg(udaf) requires a full sort, which is very inefficient for certain aggration:
The code is very simple: - I have a UDAF - What I want to do is: dataset.groupBy(cols).agg(udaf).count() The physical plan I got was: *HashAggregate(keys=[], functions=[count(1)], output=[count#67L]) +- Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#71L]) +- *Project +- Generate explode(internal_col#31), false, false, [internal_col#42] +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0, nested#1, nestedArray#2, nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31]) +- *Sort [key#0 ASC], false, 0 +- Exchange hashpartitioning(key#0, 200) +- SortAggregate(key=[key#0], functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2, nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)], output=[key#0,internal_col#37]) +- *Sort [key#0 ASC], false, 0 +- Scan ExistingRDD[key#0,nested#1,nestedArray#2,nestedObjectArray#3,value#4L] How can I make Spark to use HashAggregate (like the count(*) expression) instead of SortAggregate with my UDAF? Is it intentional? Is there an issue tracking this? ------- Regards, Andy