[ https://issues.apache.org/jira/browse/SPARK-7712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yin Huai updated SPARK-7712: ---------------------------- Fix Version/s: (was: 1.5.0) > Native Spark Window Functions & Performance Improvements > --------------------------------------------------------- > > Key: SPARK-7712 > URL: https://issues.apache.org/jira/browse/SPARK-7712 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 1.4.0 > Reporter: Herman van Hovell tot Westerflier > Original Estimate: 336h > Remaining Estimate: 336h > > Hi All, > After playing with the current spark window implementation, I tried to take > this to next level. My main goal is/was to address the following issues: > Native Spark SQL & Performance. > *Native Spark SQL* > The current implementation uses Hive UDAFs as its aggregation mechanism. We > try to address the following issues by moving to a more 'native' Spark SQL > approach: > - Window functions require Hive. Some people (mostly by accident) use Spark > SQL without Hive. Usage of UDAFs is still supported though. > - Adding your own Aggregates requires you to write them in Hive instead of > native Spark SQL. > - Hive UDAFs are very well written and quite quick, but they are opaque in > processing and memory management; this makes them hard to optimize. By using > 'Native' Spark SQL constructs we can actually do alot more optimization, for > example AggregateEvaluation style Window processing (this would require us to > move some of the code out of the AggregateEvaluation class into some Common > base class), or Tungten style memory management. > *Performance* > - Much better performance (10x) in running cases (e.g. BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) and UNBOUDED FOLLOWING cases. The current > implementation in spark uses a sliding window approach in these cases. This > means that an aggregate is maintained for every row, so space usage is N (N > being the number of rows). This also means that all these aggregates all need > to be updated separately, this takes N*(N-1)/2 updates. The running case > differs from the Sliding case because we are only adding data to an aggregate > function (no reset is required), we only need to maintain one aggregate (like > in the UNBOUNDED PRECEDING AND UNBOUNDED case), update the aggregate for each > row, and get the aggregate value after each update. This is what the new > implementation does. This approach only uses 1 buffer, and only requires N > updates; I am currently working on data with window sizes of 500-1000 doing > running sums and this saves a lot of time. The CURRENT ROW AND UNBOUNDED > FOLLOWING case also uses this approach and the fact that aggregate operations > are communitative, there is one twist though it will process the input buffer > in reverse. > - Fewer comparisons in the sliding case. The current implementation > determines frame boundaries for every input row. The new implementation makes > more use of the fact that the window is sorted, maintains the boundaries, and > only moves them when the current row order changes. This is a minor > improvement. > - A single Window node is able to process all types of Frames for the same > Partitioning/Ordering. This saves a little time/memory spent buffering and > managing partitions. > - A lot of the staging code is moved from the execution phase to the > initialization phase. Minor performance improvement, and improves readability > of the execution code. > The original work including some benchmarking code for the running case can > be here: https://github.com/hvanhovell/spark-window > A PR has been created, this is still work in progress, and can be found here: > https://github.com/apache/spark/pull/6278 > Comments, feedback and other discussion is much appreciated. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org