Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5604#discussion_r157933488
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 ---
    @@ -0,0 +1,340 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedException
    +import org.apache.spark.sql.catalyst.errors.TreeNodeException
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.types.{NumericType, DataType}
    +
    +/**
    + * The trait of the Window Specification (specified in the OVER clause or 
WINDOW clause) for
    + * Window Functions.
    + */
    +sealed trait WindowSpec
    +
    +/**
    + * The specification for a window function.
    + * @param partitionSpec It defines the way that input rows are partitioned.
    + * @param orderSpec It defines the ordering of rows in a partition.
    + * @param frameSpecification It defines the window frame in a partition.
    + */
    +case class WindowSpecDefinition(
    +    partitionSpec: Seq[Expression],
    +    orderSpec: Seq[SortOrder],
    +    frameSpecification: WindowFrame) extends Expression with WindowSpec {
    +
    +  def validate: Option[String] = frameSpecification match {
    +    case UnspecifiedFrame =>
    +      Some("Found a UnspecifiedFrame. It should be converted to a 
SpecifiedWindowFrame " +
    +        "during analysis. Please file a bug report.")
    +    case frame: SpecifiedWindowFrame => frame.validate.orElse {
    +      def checkValueBasedBoundaryForRangeFrame(): Option[String] = {
    +        if (orderSpec.length > 1)  {
    +          // It is not allowed to have a value-based PRECEDING and 
FOLLOWING
    +          // as the boundary of a Range Window Frame.
    +          Some("This Range Window Frame only accepts at most one ORDER BY 
expression.")
    +        } else if (orderSpec.nonEmpty && 
!orderSpec.head.dataType.isInstanceOf[NumericType]) {
    +          Some("The data type of the expression in the ORDER BY clause 
should be a numeric type.")
    +        } else {
    +          None
    +        }
    +      }
    +
    +      (frame.frameType, frame.frameStart, frame.frameEnd) match {
    +        case (RangeFrame, vp: ValuePreceding, _) => 
checkValueBasedBoundaryForRangeFrame()
    +        case (RangeFrame, vf: ValueFollowing, _) => 
checkValueBasedBoundaryForRangeFrame()
    +        case (RangeFrame, _, vp: ValuePreceding) => 
checkValueBasedBoundaryForRangeFrame()
    +        case (RangeFrame, _, vf: ValueFollowing) => 
checkValueBasedBoundaryForRangeFrame()
    +        case (_, _, _) => None
    +      }
    +    }
    +  }
    +
    +  type EvaluatedType = Any
    +
    +  override def children: Seq[Expression]  = partitionSpec ++ orderSpec
    +
    +  override lazy val resolved: Boolean =
    +    childrenResolved && 
frameSpecification.isInstanceOf[SpecifiedWindowFrame]
    +
    +
    +  override def toString: String = simpleString
    +
    +  override def eval(input: Row): EvaluatedType = throw new 
UnsupportedOperationException
    +  override def nullable: Boolean = true
    +  override def foldable: Boolean = false
    +  override def dataType: DataType = throw new UnsupportedOperationException
    +}
    +
    +/**
    + * A Window specification reference that refers to the 
[[WindowSpecDefinition]] defined
    + * under the name `name`.
    + */
    +case class WindowSpecReference(name: String) extends WindowSpec
    +
    +/**
    + * The trait used to represent the type of a Window Frame.
    + */
    +sealed trait FrameType
    +
    +/**
    + * RowFrame treats rows in a partition individually. When a 
[[ValuePreceding]]
    + * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is 
considered
    + * as a physical offset.
    + * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 
3-row frame,
    + * from the row precedes the current row to the row follows the current 
row.
    + */
    +case object RowFrame extends FrameType
    +
    +/**
    + * RangeFrame treats rows in a partition as groups of peers.
    + * All rows having the same `ORDER BY` ordering are considered as peers.
    + * When a [[ValuePreceding]] or a [[ValueFollowing]] is used as its 
[[FrameBoundary]],
    + * the value is considered as a logical offset.
    + * For example, assuming the value of the current row's `ORDER BY` 
expression `expr` is `v`,
    + * `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a frame 
containing rows whose values
    + * `expr` are in the range of [v-1, v+1].
    + *
    + * If `ORDER BY` clause is not defined, all rows in the partition is 
considered as peers
    + * of the current row.
    + */
    +case object RangeFrame extends FrameType
    +
    +/**
    + * The trait used to represent the type of a Window Frame Boundary.
    + */
    +sealed trait FrameBoundary {
    +  def notFollows(other: FrameBoundary): Boolean
    +}
    +
    +/** UNBOUNDED PRECEDING boundary. */
    +case object UnboundedPreceding extends FrameBoundary {
    +  def notFollows(other: FrameBoundary): Boolean = other match {
    +    case UnboundedPreceding => true
    +    case vp: ValuePreceding => true
    +    case CurrentRow => true
    +    case vf: ValueFollowing => true
    +    case UnboundedFollowing => true
    +  }
    +
    +  override def toString: String = "UNBOUNDED PRECEDING"
    +}
    +
    +/** <value> PRECEDING boundary. */
    +case class ValuePreceding(value: Int) extends FrameBoundary {
    +  def notFollows(other: FrameBoundary): Boolean = other match {
    +    case UnboundedPreceding => false
    +    case ValuePreceding(anotherValue) => value >= anotherValue
    +    case CurrentRow => true
    +    case vf: ValueFollowing => true
    +    case UnboundedFollowing => true
    +  }
    +
    +  override def toString: String = s"$value PRECEDING"
    +}
    +
    +/** CURRENT ROW boundary. */
    +case object CurrentRow extends FrameBoundary {
    +  def notFollows(other: FrameBoundary): Boolean = other match {
    +    case UnboundedPreceding => false
    +    case vp: ValuePreceding => false
    +    case CurrentRow => true
    +    case vf: ValueFollowing => true
    +    case UnboundedFollowing => true
    +  }
    +
    +  override def toString: String = "CURRENT ROW"
    +}
    +
    +/** <value> FOLLOWING boundary. */
    +case class ValueFollowing(value: Int) extends FrameBoundary {
    +  def notFollows(other: FrameBoundary): Boolean = other match {
    +    case UnboundedPreceding => false
    +    case vp: ValuePreceding => false
    +    case CurrentRow => false
    +    case ValueFollowing(anotherValue) => value <= anotherValue
    +    case UnboundedFollowing => true
    +  }
    +
    +  override def toString: String = s"$value FOLLOWING"
    +}
    +
    +/** UNBOUNDED FOLLOWING boundary. */
    +case object UnboundedFollowing extends FrameBoundary {
    +  def notFollows(other: FrameBoundary): Boolean = other match {
    +    case UnboundedPreceding => false
    +    case vp: ValuePreceding => false
    +    case CurrentRow => false
    +    case vf: ValueFollowing => false
    +    case UnboundedFollowing => true
    +  }
    +
    +  override def toString: String = "UNBOUNDED FOLLOWING"
    +}
    +
    +/**
    + * The trait used to represent the a Window Frame.
    + */
    +sealed trait WindowFrame
    +
    +/** Used as a place holder when a frame specification is not defined.  */
    +case object UnspecifiedFrame extends WindowFrame
    +
    +/** A specified Window Frame. */
    +case class SpecifiedWindowFrame(
    +    frameType: FrameType,
    +    frameStart: FrameBoundary,
    +    frameEnd: FrameBoundary) extends WindowFrame {
    +
    +  /** If this WindowFrame is valid or not. */
    +  def validate: Option[String] = (frameType, frameStart, frameEnd) match {
    +    case (_, UnboundedFollowing, _) =>
    +      Some(s"$UnboundedFollowing is not allowed as the start of a Window 
Frame.")
    +    case (_, _, UnboundedPreceding) =>
    +      Some(s"$UnboundedPreceding is not allowed as the end of a Window 
Frame.")
    +    // case (RowFrame, start, end) => ??? RowFrame specific rule
    +    // case (RangeFrame, start, end) => ??? RangeFrame specific rule
    +    case (_, start, end) =>
    +      if (start.notFollows(end)) {
    +        None
    +      } else {
    +        val reason =
    +          s"The end of this Window Frame $end is smaller than the start of 
" +
    +          s"this Window Frame $start."
    +        Some(reason)
    +      }
    +  }
    +
    +  override def toString: String = frameType match {
    +    case RowFrame => s"ROWS BETWEEN $frameStart AND $frameEnd"
    +    case RangeFrame => s"RANGE BETWEEN $frameStart AND $frameEnd"
    +  }
    +}
    +
    +object SpecifiedWindowFrame {
    +  /**
    +   *
    +   * @param hasOrderSpecification If the window spec has order by 
expressions.
    +   * @param acceptWindowFrame If the window function accepts 
user-specified frame.
    +   * @return
    +   */
    +  def defaultWindowFrame(
    +      hasOrderSpecification: Boolean,
    +      acceptWindowFrame: Boolean): SpecifiedWindowFrame = {
    +    if (hasOrderSpecification && acceptWindowFrame) {
    --- End diff --
    
    
    
    There are window functions that do not support setting a window frame (e.g. 
rank). So, for them, `acceptWindowFrame ` is false and the whole partition is 
the frame.
    
    For functions that do support setting a window frame, the default window 
frame is `RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`. Please note that 
at here, all rows considered as the peer row of the current row are included in 
the frame. `ORDER BY` clause is used to determine if two row can be considered 
as peer rows. For example, `ORDER BY c` means that if two rows have the same 
value on column `c`, they are peer rows. So, without a `ORDER BY` clause, all 
rows are considered as the peer row of the current row, which means that the 
frame is effectively the entire partition.
    
    
    Related references:
    - 
https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql
    - 
https://www.postgresql.org/docs/9.3/static/sql-expressions.html#SYNTAX-WINDOW-FUNCTIONS.
 
    (you can search `default` and find the relevant parts)



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to