[ 
https://issues.apache.org/jira/browse/SPARK-27296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083098#comment-17083098
 ] 

Patrick Cording edited comment on SPARK-27296 at 4/14/20, 10:48 AM:
--------------------------------------------------------------------

I've been trying this out, and I have a couple of questions.

First, here's a snippet showing what I did.
{code:java}
class LongestRunUdaf extends UserDefinedAggregateFunction {
  // ...
}

class LongestRunAggregator extends Aggregator[String, LongestRunBuffer, 
Option[Run]] {
  // ...
}

// This is to get a reference. It is slow.
val lrUdaf = new LongestRunUdaf
val result = df.select(lrUdaf(col("value")))

// This is many times faster, but `res` is now a `DataFrame`
val longestRunAggregator = udaf(new LongestRunAggregator, Encoders.STRING)
val res = dataset.select(longestRunAggregator(col("value")))
res.show()

// This creates a `Dataset[Option[Run]]` as needed, but now it is as slow as 
the old UDAF
res.as(Encoders.kryo[Option[Run]])

// Furthermore, this is still as slow as before
val longestRunAggregator = (new LongestRunAggregator).toColumn
dataset.select(longestRunAggregator).first{code}
I am confused by the fact that the performance improvement only is visible when 
using UDAFs, but in order to get there, I need to define an Aggregator, which 
is still slow if I use it directly. Is it not possible to also achieve the 
improvement for the type-safe cases where Aggregators are used directly?

Also, wrapping an Aggregator where the output is of a non-standard type leaves 
you with a DataFrame with binary data. Converting it to a Dataset seems to 
negate the performance improvement. Is this intended behaviour? In my example, 
it should just be one row that should be deserialized.

Or am I just using this in the wrong way?

My full experiment is here: [https://github.com/patrickcording/udaf-benchmark]


was (Author: cording):
I've been trying this out, and I have a couple of questions.

First, here's a snippet showing what I did.
{code:java}
class LongestRunUdaf extends UserDefinedAggregateFunction {
  // ...
}

class LongestRunAggregator extends Aggregator[String, LongestRunBuffer, 
Option[Run]] {
  // ...
}

// This is to get a reference. It is slow.
val lrUdaf = new LongestRunUdaf
val result = df.select(lrUdaf(col("value")))

// This is many times faster, but `res` is now a `DataFrame`
val longestRunAggregator = udaf(new LongestRunAggregator, Encoders.STRING)
val res = dataset.select(longestRunAggregator(col("value")))
res.show()

// This creates a `Dataset[Option[Run]]` as needed, but now it is as slow as 
the old UDAF
res.as(Encoders.kryo[Option[Run]])

// Furthermore, this is still as slow as before
val longestRunAggregator = (new LongestRunAggregator).toColumn
dataset.select(longestRunAggregator).first{code}
I am confused by the fact that the performance improvement only is visible when 
using UDAFs, but in order to get there, I need to define an Aggregator, which 
is still slow if I use it directly. Is it not possible to also achieve the 
improvement for the type-safe cases where Aggregators are used directly?

Also, wrapping the Aggregator where the output is of a non-standard type leaves 
you with a DataFrame with binary data. Converting it to a Dataset seems to 
negate the performance improvement. Is this intended behaviour? In my example, 
it should just be one row that should be deserialized.

Or am I just using this in the wrong way?

My full experiment is here: [https://github.com/patrickcording/udaf-benchmark]

> Efficient User Defined Aggregators 
> -----------------------------------
>
>                 Key: SPARK-27296
>                 URL: https://issues.apache.org/jira/browse/SPARK-27296
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL, Structured Streaming
>    Affects Versions: 2.3.3, 2.4.0, 3.0.0
>            Reporter: Erik Erlandson
>            Assignee: Erik Erlandson
>            Priority: Major
>              Labels: performance, usability
>             Fix For: 3.0.0
>
>
> Spark's UDAFs appear to be serializing and de-serializing to/from the 
> MutableAggregationBuffer for each row.  This gist shows a small reproducing 
> UDAF and a spark shell session:
> [https://gist.github.com/erikerlandson/3c4d8c6345d1521d89e0d894a423046f]
> The UDAF and its compantion UDT are designed to count the number of times 
> that ser/de is invoked for the aggregator.  The spark shell session 
> demonstrates that it is executing ser/de on every row of the data frame.
> Note, Spark's pre-defined aggregators do not have this problem, as they are 
> based on an internal aggregating trait that does the correct thing and only 
> calls ser/de at points such as partition boundaries, presenting final 
> results, etc.
> This is a major problem for UDAFs, as it means that every UDAF is doing a 
> massive amount of unnecessary work per row, including but not limited to Row 
> object allocations. For a more realistic UDAF having its own non trivial 
> internal structure it is obviously that much worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to