Hi Andy,

Because hash-based aggregate uses unsafe row as aggregation states, so the
aggregation buffer schema must be mutable types in unsafe row.

If you can use TypedImperativeAggregate to implement your aggregation
function, SparkSQL has ObjectHashAggregateExec which supports hash-based
aggregate using arbitrary JVM objects as aggregation states.



Andy Dang wrote
> Hi Takeshi,
> 
> Thanks for the answer. My UDAF aggregates data into an array of rows.
> 
> Apparently this makes it ineligible to using Hash-based aggregate based on
> the logic at:
> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108
> 
> The list of support data type is VERY limited unfortunately.
> 
> It doesn't make sense to me that data type must be mutable for the UDAF to
> use hash-based aggregate, but I could be missing something here :). I
> could
> achieve hash-based aggregate by turning this query to RDD mode, but that
> is
> counter intuitive IMO.
> 
> -------
> Regards,
> Andy
> 
> On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro <

> linguin.m.s@

> >
> wrote:
> 
>> Hi,
>>
>> Spark always uses hash-based aggregates if the types of aggregated data
>> are supported there;
>> otherwise, spark fails to use hash-based ones, then it uses sort-based
>> ones.
>> See: https://github.com/apache/spark/blob/master/sql/
>> core/src/main/scala/org/apache/spark/sql/execution/
>> aggregate/AggUtils.scala#L38
>>
>> So, I'm not sure about your query though, it seems the types of
>> aggregated
>> data in your query
>> are not supported for hash-based aggregates.
>>
>> // maropu
>>
>>
>>
>> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang <

> namd88@

> > wrote:
>>
>>> Hi all,
>>>
>>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
>>> which is very inefficient for certain aggration:
>>>
>>> The code is very simple:
>>> - I have a UDAF
>>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>>>
>>> The physical plan I got was:
>>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
>>> +- Exchange SinglePartition
>>>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
>>> output=[count#71L])
>>>       +- *Project
>>>          +- Generate explode(internal_col#31), false, false,
>>> [internal_col#42]
>>>             +- SortAggregate(key=[key#0],
>>> functions=[aggregatefunction(key#0,
>>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
>>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>>>                +- *Sort [key#0 ASC], false, 0
>>>                   +- Exchange hashpartitioning(key#0, 200)
>>>                      +- SortAggregate(key=[key#0],
>>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
>>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
>>> output=[key#0,internal_col#37])
>>>                         +- *Sort [key#0 ASC], false, 0
>>>                            +- Scan ExistingRDD[key#0,nested#1,nes
>>> tedArray#2,nestedObjectArray#3,value#4L]
>>>
>>> How can I make Spark to use HashAggregate (like the count(*) expression)
>>> instead of SortAggregate with my UDAF?
>>>
>>> Is it intentional? Is there an issue tracking this?
>>>
>>> -------
>>> Regards,
>>> Andy
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/How-to-hint-Spark-to-use-HashAggregate-for-UDAF-tp20526p20531.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to