Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34616177 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = - (projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " + + "partition, this can cause serious performance degradation.") AllTuples :: Nil - } else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil - } - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { - // The required child ordering has two parts. - // The first part is the expressions in the partition specification. - // We add these expressions to the required ordering to make sure input rows are grouped - // based on the partition specification. So, we only need to process a single partition - // at a time. - // The second part is the expressions specified in the ORDER BY cluase. - // Basically, we first use sort to group rows based on partition specifications and then sort - // Rows in a group based on the order specification. - (windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil + } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = + Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( - unbound: WindowExpression, - windowFunction: WindowFunction, - resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => - window.collect { - case w: WindowExpression => - ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) - } - }.toArray - - private[this] val windowFrame = - windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { - val functions = new Array[WindowFunction](computedWindowExpressions.length) - var i = 0 - while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY expressions: there can be only one expression and this + * expression must have a numerical data type. An exception can be made when the offset is 0, + * because no value modification is needed, in this case multiple and non-numeric ORDER BY + * expression are allowed. + * + * @param frameType to evaluate. This can either be Row or Range based. + * @param offset with respect to the row. + * @return a bound ordering object. + */ + private[this] def createBoundOrdering(frameType: FrameType, offset: Int) = { + frameType match { + case RangeFrame => + // Use the entire order expression when the offset is 0. + val (exprs, current, bound) = if (offset == 0) { + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) + } + // Use only the first order expression when the offset is non-null. + else if (windowSpec.orderSpec.size == 1) { + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) --- End diff -- In case of a Range Frame the offset expresses by what amount we are going to change the current Row value. 1 PRECEDING translates to an offset of -1, and 1 FOLLOWING to an offset of 1. In this case the value 2 would have lower bound 1 and upper bound 3. The operator will produce a wrong result in this case, because the ordering is reversed. I'll have a look into this.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org