[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157128#comment-17157128 ]
Martin Loncaric commented on SPARK-31356: ----------------------------------------- Actually, there seem to be 3 separate performance issues: 1. unnecessary appendColumns when groupByKey function just returns a subset of columns (though this is hard to get around in a type safe way) 2. unnecessary serialize + deserialize 3. actually the RDD's API is roughly a whole 2x faster. It seems there's a lot of room to improve aggregations > Splitting Aggregate node into separate Aggregate and Serialize for Optimizer > ---------------------------------------------------------------------------- > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.0.0 > Reporter: Martin Loncaric > Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the .map(_._2) step (taking values and throwing keys away) > unfortunately often ends up as an unnecessary serialization during > aggregation step, followed by {{DeserializeToObject + MapElements (from (K, > V) => V) + SerializeFromObject}} in the optimized logical plan. In this > example, it would be more ideal to either skip the > deserialization/serialization or {{Project (from (K, V) => V)}}. Even > manually doing a {{.select(...).as[T]}} to replace the `.map` is quite > tricky, because > * the columns are complicated, like {{[value, > ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} > * it breaks the nice type checking of Datasets > Proposal: > Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like > {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and > a {{SerializeFromObject}} node so that the Optimizer can eliminate the > serialization when it is redundant. Change aggregations to emit deserialized > results. > I had 2 ideas for what we could change: either add a new feature to > {{.reduceGroupValues}} that projects to only the necessary columns, or do > this improvement. I thought this would be a better solution because > * it will improve the performance of existing Spark applications with no > modifications > * feature growth is undesirable > Uncertainties: > Affects Version: I'm not sure - if I submit a PR soon, can we get this into > 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? > Complications: Are there any hazards in splitting Aggregation into > Aggregation + SerializeFromObject that I'm not aware of? -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org