[ 
https://issues.apache.org/jira/browse/SPARK-8641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-8641:
-----------------------------------

    Assignee: Apache Spark

> Native Spark Window Functions
> -----------------------------
>
>                 Key: SPARK-8641
>                 URL: https://issues.apache.org/jira/browse/SPARK-8641
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 1.5.0
>            Reporter: Herman van Hovell
>            Assignee: Apache Spark
>
> *Rationale*
> The window operator currently uses Hive UDAFs for all aggregation operations. 
> This is fine in terms of performance and functionality. However they limit 
> extensibility, and they are quite opaque in terms of processing and memory 
> usage. The later blocks advanced optimizations such as code generation and 
> tungsten style (advanced) memory management.
> *Requirements*
> We want to adress this by replacing the Hive UDAFs with native Spark SQL 
> UDAFs. A redesign of the Spark UDAFs is currently underway, see SPARK-4366. 
> The new window UDAFs should use this new standard, in order to make them as 
> future proof as possible. Although we are replacing the standard Hive UDAFs, 
> other existing Hive UDAFs should still be supported.
> The new window UDAFs should, at least, cover all existing Hive standard 
> window UDAFs:
> # FIRST_VALUE
> # LAST_VALUE
> # LEAD
> # LAG
> # ROW_NUMBER
> # RANK
> # DENSE_RANK
> # PERCENT_RANK
> # NTILE
> # CUME_DIST
> All these function imply a row order; this means that in order to use these 
> functions properly an
> ORDER BY clause must be defined.
> The first and last value UDAFs are already present in Spark SQL. The only 
> thing which needs to be added is skip NULL functionality.
> LEAD and LAG are not aggregates. These expressions return the value of an 
> expression a number of rows before (LAG) or ahead (LEAD) of the current row. 
> These expression put a constraint on the Window frame in which they are 
> executed: this can only be a Row frame with equal offsets.
> The ROW_NUMBER() function can be seen as a count in a running row frame (ROWS 
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW).
> RANK(), DENSE_RANK(), PERCENT_RANK(), NTILE(..) & CUME_DIST() are dependent 
> on the actual value of values in the ORDER BY clause. The ORDER BY 
> expression(s) must be made available before these functions are evaluated. 
> All these functions will have a fixed frame, but this will be dependent on 
> the implementation (probably a running row frame).
> PERCENT_RANK(), NTILE(..) & CUME_DIST() are also dependent on the size of the 
> partition being evaluated. The partition size must either be made available 
> during evaluation (this is perfectly feasible in the current implementation) 
> or the expression must be divided over two window and a merging expression, 
> for instance PERCENT_RANK() would look like this:
> {noformat}
> (RANK() OVER (PARTITION BY x ORDER BY y) - 1) / (COUNT(*) OVER (PARTITION BY 
> x) - 1)
> {noformat}
> *Design*
> The old WindowFunction interface will be replaced by the following 
> (initial/very early) design (including sub-classes):
> {noformat}
> /**
>  * A window function is a function that can only be evaluated in the context 
> of a window operator.
>  */
> trait WindowFunction {
>   self: Expression =>
>   /**
>    * Define the frame in which the window operator must be executed.
>    */
>   def frame: WindowFrame = UnspecifiedFrame
> }
> /**
>  * Base class for LEAD/LAG offset window functions.
>  *
>  * These are ordinary expressions, the idea is that the Window operator will 
> process these in a
>  * separate (specialized) window frame.
>  */
> abstract class OffsetWindowFunction(val child: Expression, val offset: Int, 
> val default: Expression) {
>   override def deterministic: Boolean = false
>   ...
> }
> case class Lead(child: Expression, offset: Int, default: Expression) extends 
> OffsetWindowFunction(child, offset, default) {
>   override val frame = SpecifiedWindowFrame(RowFrame, ValuePreceding(offset), 
> ValuePreceding(offset))
>   ...
> }
> case class Lag(child: Expression, offset: Int, default: Expression) extends 
> OffsetWindowFunction(child, offset, default) {
>   override val frame = SpecifiedWindowFrame(RowFrame, ValueFollowing(offset), 
> ValueFollowing(offset))
>   ...
> }
> case class RowNumber() extends AlgebraicAggregate with WindowFunction {
>   override def deterministic: Boolean = false
>   override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
> CurrentRow)
>   ...
> }
> abstact class RankLike(val order: Seq[Expression] = Nil) extends 
> AlgebraicAggregate with WindowFunction {
>   override def deterministic: Boolean = true
>   // This can be injected by either the Planner or the Window operator.
>   def withOrderSpec(orderSpec: Seq[Expression]): AggregateWindowFuntion
>   // This will be injected by the Window operator.
>   // Only needed by: PERCENT_RANK(), NTILE(..) & CUME_DIST(). Maybe put this 
> in a subclass.
>   def withPartitionSize(size: MutableLiteral): AggregateWindowFuntion
>   // We can do this as long as partition size is available before execution...
>   override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
> CurrentRow)
>   ...
> }
> case class Rank(order: Seq[Expression] = Nil) extends RankLike(order) {
>   ...
> }
> case class DenseRank(order: Seq[Expression] = Nil) extends RankLike(order) {
>   ...
> }
> case class PercentRank(order: Seq[Expression] = Nil) extends RankLike(order) {
>   ...
> }
> case class NTile(order: Seq[Expression] = Nil, buckets: Int) extends 
> RankLike(order) {
>   override def deterministic: Boolean = false
>   ...
> }
> case class CumeDist(order: Seq[Expression] = Nil) extends RankLike(order) {
>   ...
> }
> {noformat}
> This change will have impact on quite a few other classes as well:
> * org.apache.spark.sql.catalyst.expressions.WindowExpression
> * org.apache.spark.sql.catalyst.analysis.FunctionRegistry (Add Functions)
> * org.apache.spark.sql.execution.Window (Add another window frame processor, 
> Add support for new UDAFs)
> * org.apache.spark.sql.expressions.Window (Remove Hive-Only stuff)
> * org.apache.spark.sql.hive.HiveQl (Use regular UnresolvedFunction)
> * org.apache.spark.sql.hive.HiveWindowFunction (Remove Most of this)
> * org.apache.spark.sql.hive.ResolveHiveWindowFunction (Remove Most of this)
> *Unknowns & Challenges*
> There are still a few unknowns and challengers, mainly because the work on 
> SPARK-4366 is still in full swing:
> * How will we retain Hive UDAF functionality?
> * What will a WindowExpression containing an AggregateFunction look like? 
> Will there be an intermediate AggregateExpression2? Or is this only present 
> when distinct values and/or a non-Complete processing mode is requested?
> * The new implementation moves the responsibility of distinct processing to 
> the operator. It also
> adds two aggregate evaluation paths: AggregateFunction2 & AlgebraicAggregate 
> (it is assumed that the current AggregateFunction doesn't require a third). 
> Are there posibilities of code reuse? Or
> do we have to implement everything from scratch?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to