[GitHub] spark pull request #5604: [SPARK-1442][SQL] Window Function Support for Spar...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/5604#discussion_r157945816 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{NumericType, DataType} + +/** + * The trait of the Window Specification (specified in the OVER clause or WINDOW clause) for + * Window Functions. + */ +sealed trait WindowSpec + +/** + * The specification for a window function. + * @param partitionSpec It defines the way that input rows are partitioned. + * @param orderSpec It defines the ordering of rows in a partition. + * @param frameSpecification It defines the window frame in a partition. + */ +case class WindowSpecDefinition( +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +frameSpecification: WindowFrame) extends Expression with WindowSpec { + + def validate: Option[String] = frameSpecification match { +case UnspecifiedFrame => + Some("Found a UnspecifiedFrame. It should be converted to a SpecifiedWindowFrame " + +"during analysis. Please file a bug report.") +case frame: SpecifiedWindowFrame => frame.validate.orElse { + def checkValueBasedBoundaryForRangeFrame(): Option[String] = { +if (orderSpec.length > 1) { + // It is not allowed to have a value-based PRECEDING and FOLLOWING + // as the boundary of a Range Window Frame. + Some("This Range Window Frame only accepts at most one ORDER BY expression.") +} else if (orderSpec.nonEmpty && !orderSpec.head.dataType.isInstanceOf[NumericType]) { + Some("The data type of the expression in the ORDER BY clause should be a numeric type.") +} else { + None +} + } + + (frame.frameType, frame.frameStart, frame.frameEnd) match { +case (RangeFrame, vp: ValuePreceding, _) => checkValueBasedBoundaryForRangeFrame() +case (RangeFrame, vf: ValueFollowing, _) => checkValueBasedBoundaryForRangeFrame() +case (RangeFrame, _, vp: ValuePreceding) => checkValueBasedBoundaryForRangeFrame() +case (RangeFrame, _, vf: ValueFollowing) => checkValueBasedBoundaryForRangeFrame() +case (_, _, _) => None + } +} + } + + type EvaluatedType = Any + + override def children: Seq[Expression] = partitionSpec ++ orderSpec + + override lazy val resolved: Boolean = +childrenResolved && frameSpecification.isInstanceOf[SpecifiedWindowFrame] + + + override def toString: String = simpleString + + override def eval(input: Row): EvaluatedType = throw new UnsupportedOperationException + override def nullable: Boolean = true + override def foldable: Boolean = false + override def dataType: DataType = throw new UnsupportedOperationException +} + +/** + * A Window specification reference that refers to the [[WindowSpecDefinition]] defined + * under the name `name`. + */ +case class WindowSpecReference(name: String) extends WindowSpec + +/** + * The trait used to represent the type of a Window Frame. + */ +sealed trait FrameType + +/** + * RowFrame treats rows in a partition individually. When a [[ValuePreceding]] + * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is considered + * as a physical offset. + * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 3-row frame, + * from the row precedes the current row to the row follows the current row. + */ +case object
[GitHub] spark pull request #5604: [SPARK-1442][SQL] Window Function Support for Spar...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/5604#discussion_r157933488 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{NumericType, DataType} + +/** + * The trait of the Window Specification (specified in the OVER clause or WINDOW clause) for + * Window Functions. + */ +sealed trait WindowSpec + +/** + * The specification for a window function. + * @param partitionSpec It defines the way that input rows are partitioned. + * @param orderSpec It defines the ordering of rows in a partition. + * @param frameSpecification It defines the window frame in a partition. + */ +case class WindowSpecDefinition( +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +frameSpecification: WindowFrame) extends Expression with WindowSpec { + + def validate: Option[String] = frameSpecification match { +case UnspecifiedFrame => + Some("Found a UnspecifiedFrame. It should be converted to a SpecifiedWindowFrame " + +"during analysis. Please file a bug report.") +case frame: SpecifiedWindowFrame => frame.validate.orElse { + def checkValueBasedBoundaryForRangeFrame(): Option[String] = { +if (orderSpec.length > 1) { + // It is not allowed to have a value-based PRECEDING and FOLLOWING + // as the boundary of a Range Window Frame. + Some("This Range Window Frame only accepts at most one ORDER BY expression.") +} else if (orderSpec.nonEmpty && !orderSpec.head.dataType.isInstanceOf[NumericType]) { + Some("The data type of the expression in the ORDER BY clause should be a numeric type.") +} else { + None +} + } + + (frame.frameType, frame.frameStart, frame.frameEnd) match { +case (RangeFrame, vp: ValuePreceding, _) => checkValueBasedBoundaryForRangeFrame() +case (RangeFrame, vf: ValueFollowing, _) => checkValueBasedBoundaryForRangeFrame() +case (RangeFrame, _, vp: ValuePreceding) => checkValueBasedBoundaryForRangeFrame() +case (RangeFrame, _, vf: ValueFollowing) => checkValueBasedBoundaryForRangeFrame() +case (_, _, _) => None + } +} + } + + type EvaluatedType = Any + + override def children: Seq[Expression] = partitionSpec ++ orderSpec + + override lazy val resolved: Boolean = +childrenResolved && frameSpecification.isInstanceOf[SpecifiedWindowFrame] + + + override def toString: String = simpleString + + override def eval(input: Row): EvaluatedType = throw new UnsupportedOperationException + override def nullable: Boolean = true + override def foldable: Boolean = false + override def dataType: DataType = throw new UnsupportedOperationException +} + +/** + * A Window specification reference that refers to the [[WindowSpecDefinition]] defined + * under the name `name`. + */ +case class WindowSpecReference(name: String) extends WindowSpec + +/** + * The trait used to represent the type of a Window Frame. + */ +sealed trait FrameType + +/** + * RowFrame treats rows in a partition individually. When a [[ValuePreceding]] + * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is considered + * as a physical offset. + * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 3-row frame, + * from the row precedes the current row to the row follows the current row. + */ +case object RowFr
[GitHub] spark pull request #5604: [SPARK-1442][SQL] Window Function Support for Spar...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/5604#discussion_r157931911 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{NumericType, DataType} + +/** + * The trait of the Window Specification (specified in the OVER clause or WINDOW clause) for + * Window Functions. + */ +sealed trait WindowSpec + +/** + * The specification for a window function. + * @param partitionSpec It defines the way that input rows are partitioned. + * @param orderSpec It defines the ordering of rows in a partition. + * @param frameSpecification It defines the window frame in a partition. + */ +case class WindowSpecDefinition( +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +frameSpecification: WindowFrame) extends Expression with WindowSpec { + + def validate: Option[String] = frameSpecification match { +case UnspecifiedFrame => + Some("Found a UnspecifiedFrame. It should be converted to a SpecifiedWindowFrame " + +"during analysis. Please file a bug report.") +case frame: SpecifiedWindowFrame => frame.validate.orElse { + def checkValueBasedBoundaryForRangeFrame(): Option[String] = { +if (orderSpec.length > 1) { + // It is not allowed to have a value-based PRECEDING and FOLLOWING + // as the boundary of a Range Window Frame. + Some("This Range Window Frame only accepts at most one ORDER BY expression.") +} else if (orderSpec.nonEmpty && !orderSpec.head.dataType.isInstanceOf[NumericType]) { + Some("The data type of the expression in the ORDER BY clause should be a numeric type.") +} else { + None +} + } + + (frame.frameType, frame.frameStart, frame.frameEnd) match { +case (RangeFrame, vp: ValuePreceding, _) => checkValueBasedBoundaryForRangeFrame() +case (RangeFrame, vf: ValueFollowing, _) => checkValueBasedBoundaryForRangeFrame() +case (RangeFrame, _, vp: ValuePreceding) => checkValueBasedBoundaryForRangeFrame() +case (RangeFrame, _, vf: ValueFollowing) => checkValueBasedBoundaryForRangeFrame() +case (_, _, _) => None + } +} + } + + type EvaluatedType = Any + + override def children: Seq[Expression] = partitionSpec ++ orderSpec + + override lazy val resolved: Boolean = +childrenResolved && frameSpecification.isInstanceOf[SpecifiedWindowFrame] + + + override def toString: String = simpleString + + override def eval(input: Row): EvaluatedType = throw new UnsupportedOperationException + override def nullable: Boolean = true + override def foldable: Boolean = false + override def dataType: DataType = throw new UnsupportedOperationException +} + +/** + * A Window specification reference that refers to the [[WindowSpecDefinition]] defined + * under the name `name`. + */ +case class WindowSpecReference(name: String) extends WindowSpec + +/** + * The trait used to represent the type of a Window Frame. + */ +sealed trait FrameType + +/** + * RowFrame treats rows in a partition individually. When a [[ValuePreceding]] + * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is considered + * as a physical offset. + * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 3-row frame, + * from the row precedes the current row to the row follows the current row. + */ +case object