Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7057#discussion_r34616177
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
    @@ -38,443 +68,645 @@ case class Window(
         child: SparkPlan)
       extends UnaryNode {
     
    -  override def output: Seq[Attribute] =
    -    (projectList ++ windowExpression).map(_.toAttribute)
    +  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
     
    -  override def requiredChildDistribution: Seq[Distribution] =
    +  override def requiredChildDistribution: Seq[Distribution] = {
         if (windowSpec.partitionSpec.isEmpty) {
    -      // This operator will be very expensive.
    +      // Only show warning when the number of bytes is larger than 100 MB?
    +      logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
    +        + "partition, this can cause serious performance degradation.")
           AllTuples :: Nil
    -    } else {
    -      ClusteredDistribution(windowSpec.partitionSpec) :: Nil
    -    }
    -
    -  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
    -  // is preserved.
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    -
    -  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
    -    // The required child ordering has two parts.
    -    // The first part is the expressions in the partition specification.
    -    // We add these expressions to the required ordering to make sure 
input rows are grouped
    -    // based on the partition specification. So, we only need to process a 
single partition
    -    // at a time.
    -    // The second part is the expressions specified in the ORDER BY cluase.
    -    // Basically, we first use sort to group rows based on partition 
specifications and then sort
    -    // Rows in a group based on the order specification.
    -    (windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
    +    } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
       }
     
    -  // Since window functions basically add columns to input rows, this 
operator
    -  // will not change the ordering of input rows.
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
    +    Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
     
    -  case class ComputedWindow(
    -    unbound: WindowExpression,
    -    windowFunction: WindowFunction,
    -    resultAttribute: AttributeReference)
    -
    -  // A list of window functions that need to be computed for each group.
    -  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
    -    window.collect {
    -      case w: WindowExpression =>
    -        ComputedWindow(
    -          w,
    -          BindReferences.bindReference(w.windowFunction, child.output),
    -          AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
    -    }
    -  }.toArray
    -
    -  private[this] val windowFrame =
    -    windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  // Create window functions.
    -  private[this] def windowFunctions(): Array[WindowFunction] = {
    -    val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
    -    var i = 0
    -    while (i < computedWindowExpressions.length) {
    -      functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
    -      functions(i).init()
    -      i += 1
    +  /**
    +   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
    +   * used to determine which input row lies within the frame boundaries of 
an output row. There
    +   * are two types of boundaries that can be evaluated:
    +   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
    +   *   The offset indicates the number of rows above or below the current 
row, the frame for the
    +   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
    +   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
    +   *   index 4 to index 6.
    +   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
    +   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
    +   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
    +   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
    +   *   number of constraints on the ORDER BY expressions: there can be 
only one expression and this
    +   *   expression must have a numerical data type. An exception can be 
made when the offset is 0,
    +   *   because no value modification is needed, in this case multiple and 
non-numeric ORDER BY
    +   *   expression are allowed.
    +   *
    +   * @param frameType to evaluate. This can either be Row or Range based.
    +   * @param offset with respect to the row.
    +   * @return a bound ordering object.
    +   */
    +  private[this] def createBoundOrdering(frameType: FrameType, offset: Int) 
= {
    +    frameType match {
    +      case RangeFrame =>
    +        // Use the entire order expression when the offset is 0.
    +        val (exprs, current, bound) = if (offset == 0) {
    +          val exprs = windowSpec.orderSpec.map(_.child)
    +          val projection = newMutableProjection(exprs, child.output)
    +          (windowSpec.orderSpec, projection(), projection())
    +        }
    +        // Use only the first order expression when the offset is non-null.
    +        else if (windowSpec.orderSpec.size == 1) {
    +          val sortExpr = windowSpec.orderSpec.head
    +          val expr = sortExpr.child
    +          val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
    --- End diff --
    
    In case of a Range Frame the offset expresses by what amount we are going 
to change the current Row value. 1 PRECEDING translates to an offset of -1, and 
1 FOLLOWING to an offset of 1. In this case the value 2 would have lower bound 
1 and upper bound 3.
    
    The operator will produce a wrong result in this case, because the ordering 
is reversed. I'll have a look into this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to