Hi all,
It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
which is very inefficient for certain aggration:
The code is very simple:
- I have a UDAF
- What I want to do is: dataset.groupBy(cols).agg(udaf).count()
The physical plan I got was:
*HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)],
output=[count#71L])
+- *Project
+- Generate explode(internal_col#31), false, false,
[internal_col#42]
+- SortAggregate(key=[key#0],
functions=[aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[internal_col#31])
+- *Sort [key#0 ASC], false, 0
+- Exchange hashpartitioning(key#0, 200)
+- SortAggregate(key=[key#0],
functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[key#0,internal_col#37])
+- *Sort [key#0 ASC], false, 0
+- Scan
ExistingRDD[key#0,nested#1,nestedArray#2,nestedObjectArray#3,value#4L]
How can I make Spark to use HashAggregate (like the count(*) expression)
instead of SortAggregate with my UDAF?
Is it intentional? Is there an issue tracking this?
-------
Regards,
Andy