How to hint Spark to use HashAggregate() for UDAF
Hi all, It appears to me that Dataset.groupBy().agg(udaf) requires a full sort, which is very inefficient for certain aggration: The code is very simple: - I have a UDAF - What I want to do is: dataset.groupBy(cols).agg(udaf).count() The physical plan I got was: *HashAggregate(keys=[], functions=[count(1)], output=[count#67L]) +- Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#71L]) +- *Project +- Generate explode(internal_col#31), false, false, [internal_col#42] +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0, nested#1, nestedArray#2, nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31]) +- *Sort [key#0 ASC], false, 0 +- Exchange hashpartitioning(key#0, 200) +- SortAggregate(key=[key#0], functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2, nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)], output=[key#0,internal_col#37]) +- *Sort [key#0 ASC], false, 0 +- Scan ExistingRDD[key#0,nested#1,nestedArray#2,nestedObjectArray#3,value#4L] How can I make Spark to use HashAggregate (like the count(*) expression) instead of SortAggregate with my UDAF? Is it intentional? Is there an issue tracking this? --- Regards, Andy
Re: How to hint Spark to use HashAggregate() for UDAF
Hi, Spark always uses hash-based aggregates if the types of aggregated data are supported there; otherwise, spark fails to use hash-based ones, then it uses sort-based ones. See: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L38 So, I'm not sure about your query though, it seems the types of aggregated data in your query are not supported for hash-based aggregates. // maropu On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang wrote: > Hi all, > > It appears to me that Dataset.groupBy().agg(udaf) requires a full sort, > which is very inefficient for certain aggration: > > The code is very simple: > - I have a UDAF > - What I want to do is: dataset.groupBy(cols).agg(udaf).count() > > The physical plan I got was: > *HashAggregate(keys=[], functions=[count(1)], output=[count#67L]) > +- Exchange SinglePartition >+- *HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#71L]) > +- *Project > +- Generate explode(internal_col#31), false, false, > [internal_col#42] > +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0, > nested#1, nestedArray#2, nestedObjectArray#3, value#4L, > com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31]) >+- *Sort [key#0 ASC], false, 0 > +- Exchange hashpartitioning(key#0, 200) > +- SortAggregate(key=[key#0], > functions=[partial_aggregatefunction(key#0, > nested#1, nestedArray#2, nestedObjectArray#3, value#4L, > com.[...]uDf@108b121f, 0, 0)], output=[key#0,internal_col#37]) > +- *Sort [key#0 ASC], false, 0 >+- Scan ExistingRDD[key#0,nested#1, > nestedArray#2,nestedObjectArray#3,value#4L] > > How can I make Spark to use HashAggregate (like the count(*) expression) > instead of SortAggregate with my UDAF? > > Is it intentional? Is there an issue tracking this? > > --- > Regards, > Andy > -- --- Takeshi Yamamuro
Re: How to hint Spark to use HashAggregate() for UDAF
Hi Takeshi, Thanks for the answer. My UDAF aggregates data into an array of rows. Apparently this makes it ineligible to using Hash-based aggregate based on the logic at: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108 The list of support data type is VERY limited unfortunately. It doesn't make sense to me that data type must be mutable for the UDAF to use hash-based aggregate, but I could be missing something here :). I could achieve hash-based aggregate by turning this query to RDD mode, but that is counter intuitive IMO. --- Regards, Andy On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro wrote: > Hi, > > Spark always uses hash-based aggregates if the types of aggregated data > are supported there; > otherwise, spark fails to use hash-based ones, then it uses sort-based > ones. > See: https://github.com/apache/spark/blob/master/sql/ > core/src/main/scala/org/apache/spark/sql/execution/ > aggregate/AggUtils.scala#L38 > > So, I'm not sure about your query though, it seems the types of aggregated > data in your query > are not supported for hash-based aggregates. > > // maropu > > > > On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang wrote: > >> Hi all, >> >> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort, >> which is very inefficient for certain aggration: >> >> The code is very simple: >> - I have a UDAF >> - What I want to do is: dataset.groupBy(cols).agg(udaf).count() >> >> The physical plan I got was: >> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L]) >> +- Exchange SinglePartition >>+- *HashAggregate(keys=[], functions=[partial_count(1)], >> output=[count#71L]) >> +- *Project >> +- Generate explode(internal_col#31), false, false, >> [internal_col#42] >> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0, >> nested#1, nestedArray#2, nestedObjectArray#3, value#4L, >> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31]) >>+- *Sort [key#0 ASC], false, 0 >> +- Exchange hashpartitioning(key#0, 200) >> +- SortAggregate(key=[key#0], >> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2, >> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)], >> output=[key#0,internal_col#37]) >> +- *Sort [key#0 ASC], false, 0 >>+- Scan ExistingRDD[key#0,nested#1,nes >> tedArray#2,nestedObjectArray#3,value#4L] >> >> How can I make Spark to use HashAggregate (like the count(*) expression) >> instead of SortAggregate with my UDAF? >> >> Is it intentional? Is there an issue tracking this? >> >> --- >> Regards, >> Andy >> > > > > -- > --- > Takeshi Yamamuro >