[GitHub] spark pull request #5604: [SPARK-1442][SQL] Window Function Support for Spar...

2017-12-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/5604#discussion_r157945816
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{NumericType, DataType}
+
+/**
+ * The trait of the Window Specification (specified in the OVER clause or 
WINDOW clause) for
+ * Window Functions.
+ */
+sealed trait WindowSpec
+
+/**
+ * The specification for a window function.
+ * @param partitionSpec It defines the way that input rows are partitioned.
+ * @param orderSpec It defines the ordering of rows in a partition.
+ * @param frameSpecification It defines the window frame in a partition.
+ */
+case class WindowSpecDefinition(
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+frameSpecification: WindowFrame) extends Expression with WindowSpec {
+
+  def validate: Option[String] = frameSpecification match {
+case UnspecifiedFrame =>
+  Some("Found a UnspecifiedFrame. It should be converted to a 
SpecifiedWindowFrame " +
+"during analysis. Please file a bug report.")
+case frame: SpecifiedWindowFrame => frame.validate.orElse {
+  def checkValueBasedBoundaryForRangeFrame(): Option[String] = {
+if (orderSpec.length > 1)  {
+  // It is not allowed to have a value-based PRECEDING and 
FOLLOWING
+  // as the boundary of a Range Window Frame.
+  Some("This Range Window Frame only accepts at most one ORDER BY 
expression.")
+} else if (orderSpec.nonEmpty && 
!orderSpec.head.dataType.isInstanceOf[NumericType]) {
+  Some("The data type of the expression in the ORDER BY clause 
should be a numeric type.")
+} else {
+  None
+}
+  }
+
+  (frame.frameType, frame.frameStart, frame.frameEnd) match {
+case (RangeFrame, vp: ValuePreceding, _) => 
checkValueBasedBoundaryForRangeFrame()
+case (RangeFrame, vf: ValueFollowing, _) => 
checkValueBasedBoundaryForRangeFrame()
+case (RangeFrame, _, vp: ValuePreceding) => 
checkValueBasedBoundaryForRangeFrame()
+case (RangeFrame, _, vf: ValueFollowing) => 
checkValueBasedBoundaryForRangeFrame()
+case (_, _, _) => None
+  }
+}
+  }
+
+  type EvaluatedType = Any
+
+  override def children: Seq[Expression]  = partitionSpec ++ orderSpec
+
+  override lazy val resolved: Boolean =
+childrenResolved && 
frameSpecification.isInstanceOf[SpecifiedWindowFrame]
+
+
+  override def toString: String = simpleString
+
+  override def eval(input: Row): EvaluatedType = throw new 
UnsupportedOperationException
+  override def nullable: Boolean = true
+  override def foldable: Boolean = false
+  override def dataType: DataType = throw new UnsupportedOperationException
+}
+
+/**
+ * A Window specification reference that refers to the 
[[WindowSpecDefinition]] defined
+ * under the name `name`.
+ */
+case class WindowSpecReference(name: String) extends WindowSpec
+
+/**
+ * The trait used to represent the type of a Window Frame.
+ */
+sealed trait FrameType
+
+/**
+ * RowFrame treats rows in a partition individually. When a 
[[ValuePreceding]]
+ * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is 
considered
+ * as a physical offset.
+ * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 
3-row frame,
+ * from the row precedes the current row to the row follows the current 
row.
+ */
+case 

[GitHub] spark pull request #5604: [SPARK-1442][SQL] Window Function Support for Spar...

2017-12-19 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/5604#discussion_r157933488
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{NumericType, DataType}
+
+/**
+ * The trait of the Window Specification (specified in the OVER clause or 
WINDOW clause) for
+ * Window Functions.
+ */
+sealed trait WindowSpec
+
+/**
+ * The specification for a window function.
+ * @param partitionSpec It defines the way that input rows are partitioned.
+ * @param orderSpec It defines the ordering of rows in a partition.
+ * @param frameSpecification It defines the window frame in a partition.
+ */
+case class WindowSpecDefinition(
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+frameSpecification: WindowFrame) extends Expression with WindowSpec {
+
+  def validate: Option[String] = frameSpecification match {
+case UnspecifiedFrame =>
+  Some("Found a UnspecifiedFrame. It should be converted to a 
SpecifiedWindowFrame " +
+"during analysis. Please file a bug report.")
+case frame: SpecifiedWindowFrame => frame.validate.orElse {
+  def checkValueBasedBoundaryForRangeFrame(): Option[String] = {
+if (orderSpec.length > 1)  {
+  // It is not allowed to have a value-based PRECEDING and 
FOLLOWING
+  // as the boundary of a Range Window Frame.
+  Some("This Range Window Frame only accepts at most one ORDER BY 
expression.")
+} else if (orderSpec.nonEmpty && 
!orderSpec.head.dataType.isInstanceOf[NumericType]) {
+  Some("The data type of the expression in the ORDER BY clause 
should be a numeric type.")
+} else {
+  None
+}
+  }
+
+  (frame.frameType, frame.frameStart, frame.frameEnd) match {
+case (RangeFrame, vp: ValuePreceding, _) => 
checkValueBasedBoundaryForRangeFrame()
+case (RangeFrame, vf: ValueFollowing, _) => 
checkValueBasedBoundaryForRangeFrame()
+case (RangeFrame, _, vp: ValuePreceding) => 
checkValueBasedBoundaryForRangeFrame()
+case (RangeFrame, _, vf: ValueFollowing) => 
checkValueBasedBoundaryForRangeFrame()
+case (_, _, _) => None
+  }
+}
+  }
+
+  type EvaluatedType = Any
+
+  override def children: Seq[Expression]  = partitionSpec ++ orderSpec
+
+  override lazy val resolved: Boolean =
+childrenResolved && 
frameSpecification.isInstanceOf[SpecifiedWindowFrame]
+
+
+  override def toString: String = simpleString
+
+  override def eval(input: Row): EvaluatedType = throw new 
UnsupportedOperationException
+  override def nullable: Boolean = true
+  override def foldable: Boolean = false
+  override def dataType: DataType = throw new UnsupportedOperationException
+}
+
+/**
+ * A Window specification reference that refers to the 
[[WindowSpecDefinition]] defined
+ * under the name `name`.
+ */
+case class WindowSpecReference(name: String) extends WindowSpec
+
+/**
+ * The trait used to represent the type of a Window Frame.
+ */
+sealed trait FrameType
+
+/**
+ * RowFrame treats rows in a partition individually. When a 
[[ValuePreceding]]
+ * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is 
considered
+ * as a physical offset.
+ * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 
3-row frame,
+ * from the row precedes the current row to the row follows the current 
row.
+ */
+case object 

[GitHub] spark pull request #5604: [SPARK-1442][SQL] Window Function Support for Spar...

2017-12-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/5604#discussion_r157931911
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{NumericType, DataType}
+
+/**
+ * The trait of the Window Specification (specified in the OVER clause or 
WINDOW clause) for
+ * Window Functions.
+ */
+sealed trait WindowSpec
+
+/**
+ * The specification for a window function.
+ * @param partitionSpec It defines the way that input rows are partitioned.
+ * @param orderSpec It defines the ordering of rows in a partition.
+ * @param frameSpecification It defines the window frame in a partition.
+ */
+case class WindowSpecDefinition(
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+frameSpecification: WindowFrame) extends Expression with WindowSpec {
+
+  def validate: Option[String] = frameSpecification match {
+case UnspecifiedFrame =>
+  Some("Found a UnspecifiedFrame. It should be converted to a 
SpecifiedWindowFrame " +
+"during analysis. Please file a bug report.")
+case frame: SpecifiedWindowFrame => frame.validate.orElse {
+  def checkValueBasedBoundaryForRangeFrame(): Option[String] = {
+if (orderSpec.length > 1)  {
+  // It is not allowed to have a value-based PRECEDING and 
FOLLOWING
+  // as the boundary of a Range Window Frame.
+  Some("This Range Window Frame only accepts at most one ORDER BY 
expression.")
+} else if (orderSpec.nonEmpty && 
!orderSpec.head.dataType.isInstanceOf[NumericType]) {
+  Some("The data type of the expression in the ORDER BY clause 
should be a numeric type.")
+} else {
+  None
+}
+  }
+
+  (frame.frameType, frame.frameStart, frame.frameEnd) match {
+case (RangeFrame, vp: ValuePreceding, _) => 
checkValueBasedBoundaryForRangeFrame()
+case (RangeFrame, vf: ValueFollowing, _) => 
checkValueBasedBoundaryForRangeFrame()
+case (RangeFrame, _, vp: ValuePreceding) => 
checkValueBasedBoundaryForRangeFrame()
+case (RangeFrame, _, vf: ValueFollowing) => 
checkValueBasedBoundaryForRangeFrame()
+case (_, _, _) => None
+  }
+}
+  }
+
+  type EvaluatedType = Any
+
+  override def children: Seq[Expression]  = partitionSpec ++ orderSpec
+
+  override lazy val resolved: Boolean =
+childrenResolved && 
frameSpecification.isInstanceOf[SpecifiedWindowFrame]
+
+
+  override def toString: String = simpleString
+
+  override def eval(input: Row): EvaluatedType = throw new 
UnsupportedOperationException
+  override def nullable: Boolean = true
+  override def foldable: Boolean = false
+  override def dataType: DataType = throw new UnsupportedOperationException
+}
+
+/**
+ * A Window specification reference that refers to the 
[[WindowSpecDefinition]] defined
+ * under the name `name`.
+ */
+case class WindowSpecReference(name: String) extends WindowSpec
+
+/**
+ * The trait used to represent the type of a Window Frame.
+ */
+sealed trait FrameType
+
+/**
+ * RowFrame treats rows in a partition individually. When a 
[[ValuePreceding]]
+ * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is 
considered
+ * as a physical offset.
+ * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 
3-row frame,
+ * from the row precedes the current row to the row follows the current 
row.
+ */
+case