[ https://issues.apache.org/jira/browse/SPARK-27296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083098#comment-17083098 ]
Patrick Cording edited comment on SPARK-27296 at 4/14/20, 10:42 AM: -------------------------------------------------------------------- I've been trying this out, and I have a couple of questions. First, here's a snippet showing what I did. {code:java} class LongestRunUdaf extends UserDefinedAggregateFunction { // ... } class LongestRunAggregator extends Aggregator[String, LongestRunBuffer, Option[Run]] { // ... } // This is to get a reference. It is slow. val lrUdaf = new LongestRunUdaf val result = df.select(lrUdaf(col("value"))) // This is many times faster, but `res` is now a `DataFrame` val longestRunAggregator = udaf(new LongestRunAggregator, Encoders.STRING) val res = dataset.select(longestRunAggregator(col("value"))) res.show() // This creates a `Dataset[Option[Run]]` as needed, but now it is as slow as the old UDAF res.as(Encoders.kryo[Option[Run]]) // Furthermore, this is still as slow as before val longestRunAggregator = (new LongestRunAggregator).toColumn dataset.select(longestRunAggregator).first{code} I am confused by the fact that the performance improvement only is visible when using UDAFs, but in order to get there, I need to define an Aggregator, which is still slow if I use it directly. Is it not possible to also achieve the improvement for the type-safe cases where Aggregators are used directly? Also, wrapping the Aggregator where the output is of a non-standard type leaves you with a DataFrame with binary data. Converting it to a Dataset seems to negate the performance improvement. Is this intended behaviour? Or am I just using this in the wrong way? My full experiment is here: [https://github.com/patrickcording/udaf-benchmark] was (Author: cording): I've been trying this out, and I have a couple of questions. First, here's a snippet showing what I did. {code:java} class LongestRunUdaf extends UserDefinedAggregateFunction { // ... } class LongestRunAggregator extends Aggregator[String, LongestRunBuffer, Option[Run]] { // ... } // This is to get a reference. It is slow. val lrUdaf = new LongestRunUdaf val result = df.select(lrUdaf(col("value"))) // This is many times faster, but `res` is now a `DataFrame` val longestRunAggregator = udaf(new LongestRunAggregator, Encoders.STRING) val res = dataset.select(longestRunAggregator(col("value"))) // This creates a `Dataset[Option[Run]]` as needed, but now it is as slow as the old UDAF res.as(Encoders.kryo[Option[Run]]) // Furthermore, this is still as slow as before val longestRunAggregator = (new LongestRunAggregator).toColumn dataset.select(longestRunAggregator).first{code} I am confused by the fact that the performance improvement only is visible when using UDAFs, but in order to get there, I need to define an Aggregator, which is still slow if I use it directly. Is it not possible to also achieve the improvement for the type-safe cases where Aggregators are used directly? Also, wrapping the Aggregator where the output is of a non-standard type leaves you with a DataFrame with binary data. Converting it to a Dataset seems to negate the performance improvement. Is this intended behaviour? Or am I just using this in the wrong way? My full experiment is here: [https://github.com/patrickcording/udaf-benchmark] > Efficient User Defined Aggregators > ----------------------------------- > > Key: SPARK-27296 > URL: https://issues.apache.org/jira/browse/SPARK-27296 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL, Structured Streaming > Affects Versions: 2.3.3, 2.4.0, 3.0.0 > Reporter: Erik Erlandson > Assignee: Erik Erlandson > Priority: Major > Labels: performance, usability > Fix For: 3.0.0 > > > Spark's UDAFs appear to be serializing and de-serializing to/from the > MutableAggregationBuffer for each row. This gist shows a small reproducing > UDAF and a spark shell session: > [https://gist.github.com/erikerlandson/3c4d8c6345d1521d89e0d894a423046f] > The UDAF and its compantion UDT are designed to count the number of times > that ser/de is invoked for the aggregator. The spark shell session > demonstrates that it is executing ser/de on every row of the data frame. > Note, Spark's pre-defined aggregators do not have this problem, as they are > based on an internal aggregating trait that does the correct thing and only > calls ser/de at points such as partition boundaries, presenting final > results, etc. > This is a major problem for UDAFs, as it means that every UDAF is doing a > massive amount of unnecessary work per row, including but not limited to Row > object allocations. For a more realistic UDAF having its own non trivial > internal structure it is obviously that much worse. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org