Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18540#discussion_r128894791
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 ---
    @@ -106,173 +101,167 @@ case class WindowSpecReference(name: String) 
extends WindowSpec
     /**
      * The trait used to represent the type of a Window Frame.
      */
    -sealed trait FrameType
    +sealed trait FrameType {
    +  def inputType: AbstractDataType
    +  def sql: String
    +}
     
     /**
    - * RowFrame treats rows in a partition individually. When a 
[[ValuePreceding]]
    - * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is 
considered
    - * as a physical offset.
    + * RowFrame treats rows in a partition individually. Values used in a row 
frame are considered
    + * to be physical offsets.
      * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 
3-row frame,
      * from the row that precedes the current row to the row that follows the 
current row.
      */
    -case object RowFrame extends FrameType
    +case object RowFrame extends FrameType {
    +  override def inputType: AbstractDataType = IntegerType
    +  override def sql: String = "ROWS"
    +}
     
     /**
    - * RangeFrame treats rows in a partition as groups of peers.
    - * All rows having the same `ORDER BY` ordering are considered as peers.
    - * When a [[ValuePreceding]] or a [[ValueFollowing]] is used as its 
[[FrameBoundary]],
    - * the value is considered as a logical offset.
    + * RangeFrame treats rows in a partition as groups of peers. All rows 
having the same `ORDER BY`
    + * ordering are considered as peers. Values used in a range frame are 
considered to be logical
    + * offsets.
      * For example, assuming the value of the current row's `ORDER BY` 
expression `expr` is `v`,
      * `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a frame 
containing rows whose values
      * `expr` are in the range of [v-1, v+1].
      *
      * If `ORDER BY` clause is not defined, all rows in the partition are 
considered as peers
      * of the current row.
      */
    -case object RangeFrame extends FrameType
    +case object RangeFrame extends FrameType {
    +  override def inputType: AbstractDataType = 
TypeCollection.NumericAndInterval
    +  override def sql: String = "RANGE"
    +}
     
     /**
    - * The trait used to represent the type of a Window Frame Boundary.
    + * The trait used to represent special boundaries used in a window frame.
      */
    -sealed trait FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean
    +sealed trait SpecialFrameBoundary extends Expression with Unevaluable {
    +  override lazy val children: Seq[Expression] = Nil
    +  override def dataType: DataType = NullType
    +  override def foldable: Boolean = false
    +  override def nullable: Boolean = false
     }
     
    +/** UNBOUNDED boundary. */
    +case object Unbounded extends SpecialFrameBoundary
    +
    +/** CURRENT ROW boundary. */
    +case object CurrentRow extends SpecialFrameBoundary
    +
     /**
    - * Extractor for making working with frame boundaries easier.
    + * Represents a window frame.
      */
    -object FrameBoundary {
    -  def apply(boundary: FrameBoundary): Option[Int] = unapply(boundary)
    -  def unapply(boundary: FrameBoundary): Option[Int] = boundary match {
    -    case CurrentRow => Some(0)
    -    case ValuePreceding(offset) => Some(-offset)
    -    case ValueFollowing(offset) => Some(offset)
    -    case _ => None
    -  }
    +sealed trait WindowFrame extends Expression with Unevaluable {
    +  override lazy val children: Seq[Expression] = Nil
    +  override def dataType: DataType = throw new 
UnsupportedOperationException("dataType")
    +  override def foldable: Boolean = false
    +  override def nullable: Boolean = false
     }
     
    -/** UNBOUNDED PRECEDING boundary. */
    -case object UnboundedPreceding extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => true
    -    case vp: ValuePreceding => true
    -    case CurrentRow => true
    -    case vf: ValueFollowing => true
    -    case UnboundedFollowing => true
    -  }
    +/** Used as a placeholder when a frame specification is not defined. */
    +case object UnspecifiedFrame extends WindowFrame
     
    -  override def toString: String = "UNBOUNDED PRECEDING"
    -}
    +/**
    + * A specified Window Frame. The val lower/uppper can be either a foldable 
[[Expression]] or a
    + * [[SpecialFrameBoundary]].
    + */
    +case class SpecifiedWindowFrame(
    +    frameType: FrameType,
    +    lower: Expression,
    +    upper: Expression)
    +  extends WindowFrame {
     
    -/** <value> PRECEDING boundary. */
    -case class ValuePreceding(value: Int) extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case ValuePreceding(anotherValue) => value >= anotherValue
    -    case CurrentRow => true
    -    case vf: ValueFollowing => true
    -    case UnboundedFollowing => true
    -  }
    +  override lazy val children: Seq[Expression] = lower :: upper :: Nil
     
    -  override def toString: String = s"$value PRECEDING"
    -}
    +  lazy val valueBoundary: Seq[Expression] =
    +    children.filterNot(_.isInstanceOf[SpecialFrameBoundary])
     
    -/** CURRENT ROW boundary. */
    -case object CurrentRow extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case vp: ValuePreceding => false
    -    case CurrentRow => true
    -    case vf: ValueFollowing => true
    -    case UnboundedFollowing => true
    -  }
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    // Check lower value.
    +    val lowerCheck = checkBoundary(lower, "lower")
    +    if (lowerCheck.isFailure) {
    +      return lowerCheck
    +    }
     
    -  override def toString: String = "CURRENT ROW"
    -}
    +    // Check upper value.
    +    val upperCheck = checkBoundary(upper, "upper")
    +    if (upperCheck.isFailure) {
    +      return upperCheck
    +    }
     
    -/** <value> FOLLOWING boundary. */
    -case class ValueFollowing(value: Int) extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case vp: ValuePreceding => false
    -    case CurrentRow => false
    -    case ValueFollowing(anotherValue) => value <= anotherValue
    -    case UnboundedFollowing => true
    +    // Check combination (of expressions).
    +    (lower, upper) match {
    +      case (l: SpecialFrameBoundary, _) => TypeCheckSuccess
    +      case (_, u: SpecialFrameBoundary) => TypeCheckSuccess
    +      case (l: Expression, u: Expression) if l.dataType != u.dataType =>
    +        TypeCheckFailure(
    +          s"Window frame bounds '$lower' and '$upper' do no not have the 
same data type: " +
    +            s"'${l.dataType.catalogString}' <> 
'${u.dataType.catalogString}'")
    +      case (l: Expression, u: Expression) if isGreaterThan(l, u) =>
    +        TypeCheckFailure(
    +          "The lower bound of a window frame must less than or equal to 
the upper bound")
    +      case _ => TypeCheckSuccess
    +    }
       }
     
    -  override def toString: String = s"$value FOLLOWING"
    -}
    -
    -/** UNBOUNDED FOLLOWING boundary. */
    -case object UnboundedFollowing extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case vp: ValuePreceding => false
    -    case CurrentRow => false
    -    case vf: ValueFollowing => false
    -    case UnboundedFollowing => true
    +  override def sql: String = {
    +    val lowerSql = boundarySql(lower, "PRECEDING")
    +    val upperSql = boundarySql(upper, "FOLLOWING")
    +    s"${frameType.sql} BETWEEN $lowerSql AND $upperSql"
       }
     
    -  override def toString: String = "UNBOUNDED FOLLOWING"
    -}
    +  def isUnbounded: Boolean = lower == Unbounded && upper == Unbounded
     
    -/**
    - * Represents a window frame.
    - */
    -sealed trait WindowFrame
    +  def isValueBound: Boolean = valueBoundary.nonEmpty
    --- End diff --
    
    oh, is it possible that both `lower` and `upper` are current row?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to