[ https://issues.apache.org/jira/browse/SPARK-8641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-8641: ----------------------------------- Assignee: Apache Spark > Native Spark Window Functions > ----------------------------- > > Key: SPARK-8641 > URL: https://issues.apache.org/jira/browse/SPARK-8641 > Project: Spark > Issue Type: Sub-task > Components: SQL > Affects Versions: 1.5.0 > Reporter: Herman van Hovell > Assignee: Apache Spark > > *Rationale* > The window operator currently uses Hive UDAFs for all aggregation operations. > This is fine in terms of performance and functionality. However they limit > extensibility, and they are quite opaque in terms of processing and memory > usage. The later blocks advanced optimizations such as code generation and > tungsten style (advanced) memory management. > *Requirements* > We want to adress this by replacing the Hive UDAFs with native Spark SQL > UDAFs. A redesign of the Spark UDAFs is currently underway, see SPARK-4366. > The new window UDAFs should use this new standard, in order to make them as > future proof as possible. Although we are replacing the standard Hive UDAFs, > other existing Hive UDAFs should still be supported. > The new window UDAFs should, at least, cover all existing Hive standard > window UDAFs: > # FIRST_VALUE > # LAST_VALUE > # LEAD > # LAG > # ROW_NUMBER > # RANK > # DENSE_RANK > # PERCENT_RANK > # NTILE > # CUME_DIST > All these function imply a row order; this means that in order to use these > functions properly an > ORDER BY clause must be defined. > The first and last value UDAFs are already present in Spark SQL. The only > thing which needs to be added is skip NULL functionality. > LEAD and LAG are not aggregates. These expressions return the value of an > expression a number of rows before (LAG) or ahead (LEAD) of the current row. > These expression put a constraint on the Window frame in which they are > executed: this can only be a Row frame with equal offsets. > The ROW_NUMBER() function can be seen as a count in a running row frame (ROWS > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW). > RANK(), DENSE_RANK(), PERCENT_RANK(), NTILE(..) & CUME_DIST() are dependent > on the actual value of values in the ORDER BY clause. The ORDER BY > expression(s) must be made available before these functions are evaluated. > All these functions will have a fixed frame, but this will be dependent on > the implementation (probably a running row frame). > PERCENT_RANK(), NTILE(..) & CUME_DIST() are also dependent on the size of the > partition being evaluated. The partition size must either be made available > during evaluation (this is perfectly feasible in the current implementation) > or the expression must be divided over two window and a merging expression, > for instance PERCENT_RANK() would look like this: > {noformat} > (RANK() OVER (PARTITION BY x ORDER BY y) - 1) / (COUNT(*) OVER (PARTITION BY > x) - 1) > {noformat} > *Design* > The old WindowFunction interface will be replaced by the following > (initial/very early) design (including sub-classes): > {noformat} > /** > * A window function is a function that can only be evaluated in the context > of a window operator. > */ > trait WindowFunction { > self: Expression => > /** > * Define the frame in which the window operator must be executed. > */ > def frame: WindowFrame = UnspecifiedFrame > } > /** > * Base class for LEAD/LAG offset window functions. > * > * These are ordinary expressions, the idea is that the Window operator will > process these in a > * separate (specialized) window frame. > */ > abstract class OffsetWindowFunction(val child: Expression, val offset: Int, > val default: Expression) { > override def deterministic: Boolean = false > ... > } > case class Lead(child: Expression, offset: Int, default: Expression) extends > OffsetWindowFunction(child, offset, default) { > override val frame = SpecifiedWindowFrame(RowFrame, ValuePreceding(offset), > ValuePreceding(offset)) > ... > } > case class Lag(child: Expression, offset: Int, default: Expression) extends > OffsetWindowFunction(child, offset, default) { > override val frame = SpecifiedWindowFrame(RowFrame, ValueFollowing(offset), > ValueFollowing(offset)) > ... > } > case class RowNumber() extends AlgebraicAggregate with WindowFunction { > override def deterministic: Boolean = false > override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, > CurrentRow) > ... > } > abstact class RankLike(val order: Seq[Expression] = Nil) extends > AlgebraicAggregate with WindowFunction { > override def deterministic: Boolean = true > // This can be injected by either the Planner or the Window operator. > def withOrderSpec(orderSpec: Seq[Expression]): AggregateWindowFuntion > // This will be injected by the Window operator. > // Only needed by: PERCENT_RANK(), NTILE(..) & CUME_DIST(). Maybe put this > in a subclass. > def withPartitionSize(size: MutableLiteral): AggregateWindowFuntion > // We can do this as long as partition size is available before execution... > override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, > CurrentRow) > ... > } > case class Rank(order: Seq[Expression] = Nil) extends RankLike(order) { > ... > } > case class DenseRank(order: Seq[Expression] = Nil) extends RankLike(order) { > ... > } > case class PercentRank(order: Seq[Expression] = Nil) extends RankLike(order) { > ... > } > case class NTile(order: Seq[Expression] = Nil, buckets: Int) extends > RankLike(order) { > override def deterministic: Boolean = false > ... > } > case class CumeDist(order: Seq[Expression] = Nil) extends RankLike(order) { > ... > } > {noformat} > This change will have impact on quite a few other classes as well: > * org.apache.spark.sql.catalyst.expressions.WindowExpression > * org.apache.spark.sql.catalyst.analysis.FunctionRegistry (Add Functions) > * org.apache.spark.sql.execution.Window (Add another window frame processor, > Add support for new UDAFs) > * org.apache.spark.sql.expressions.Window (Remove Hive-Only stuff) > * org.apache.spark.sql.hive.HiveQl (Use regular UnresolvedFunction) > * org.apache.spark.sql.hive.HiveWindowFunction (Remove Most of this) > * org.apache.spark.sql.hive.ResolveHiveWindowFunction (Remove Most of this) > *Unknowns & Challenges* > There are still a few unknowns and challengers, mainly because the work on > SPARK-4366 is still in full swing: > * How will we retain Hive UDAF functionality? > * What will a WindowExpression containing an AggregateFunction look like? > Will there be an intermediate AggregateExpression2? Or is this only present > when distinct values and/or a non-Complete processing mode is requested? > * The new implementation moves the responsibility of distinct processing to > the operator. It also > adds two aggregate evaluation paths: AggregateFunction2 & AlgebraicAggregate > (it is assumed that the current AggregateFunction doesn't require a third). > Are there posibilities of code reuse? Or > do we have to implement everything from scratch? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org