Hi all,

It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
which is very inefficient for certain aggration:

The code is very simple:
- I have a UDAF
- What I want to do is: dataset.groupBy(cols).agg(udaf).count()

The physical plan I got was:
*HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)],
output=[count#71L])
      +- *Project
         +- Generate explode(internal_col#31), false, false,
[internal_col#42]
            +- SortAggregate(key=[key#0],
functions=[aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[internal_col#31])
               +- *Sort [key#0 ASC], false, 0
                  +- Exchange hashpartitioning(key#0, 200)
                     +- SortAggregate(key=[key#0],
functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[key#0,internal_col#37])
                        +- *Sort [key#0 ASC], false, 0
                           +- Scan
ExistingRDD[key#0,nested#1,nestedArray#2,nestedObjectArray#3,value#4L]

How can I make Spark to use HashAggregate (like the count(*) expression)
instead of SortAggregate with my UDAF?

Is it intentional? Is there an issue tracking this?

-------
Regards,
Andy

Reply via email to