[SPARK-1442] [SQL] Window Function Support for Spark SQL

Adding more information about the implementation...

This PR is adding the support of window functions to Spark SQL (specifically 
OVER and WINDOW clause). For every expression having a OVER clause, we use a 
WindowExpression as the container of a WindowFunction and the corresponding 
WindowSpecDefinition (the definition of a window frame, i.e. partition 
specification, order specification, and frame specification appearing in a OVER 
clause).
# Implementation #
The high level work flow of the implementation is described as follows.

*       Query parsing: In the query parse process, all WindowExpressions are 
originally placed in the projectList of a Project operator or the 
aggregateExpressions of an Aggregate operator. It makes our changes to simple 
and keep all of parsing rules for window functions at a single place 
(nodesToWindowSpecification). For the WINDOWclause in a query, we use a 
WithWindowDefinition as the container as the mapping from the name of a window 
specification to a WindowSpecDefinition. This changes is similar with our 
common table expression support.

*       Analysis: The query analysis process has three steps for window 
functions.

 *      Resolve all WindowSpecReferences by replacing them with 
WindowSpecReferences according to the mapping table stored in the node of 
WithWindowDefinition.
 *      Resolve WindowFunctions in the projectList of a Project operator or the 
aggregateExpressions of an Aggregate operator. For this PR, we use Hive's 
functions for window functions because we will have a major refactoring of our 
internal UDAFs and it is better to switch our UDAFs after that refactoring work.
 *      Once we have resolved all WindowFunctions, we will use 
ResolveWindowFunction to extract WindowExpressions from projectList and 
aggregateExpressions and then create a Window operator for every distinct 
WindowSpecDefinition. With this choice, at the execution time, we can rely on 
the Exchange operator to do all of work on reorganizing the table and we do not 
need to worry about it in the physical Window operator. An example analyzed 
plan is shown as follows

```
sql("""
SELECT
  year, country, product, sales,
  avg(sales) over(partition by product) avg_product,
  sum(sales) over(partition by country) sum_country
FROM sales
ORDER BY year, country, product
""").explain(true)

== Analyzed Logical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Project [year#34,country#35,product#36,sales#37,avg_product#27,sum_country#28]
  Window [year#34,country#35,product#36,sales#37,avg_product#27], 
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37)
 WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Window [year#34,country#35,product#36,sales#37], 
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37)
 WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    Project [year#34,country#35,product#36,sales#37]
     MetastoreRelation default, sales, None
```

*       Query planning: In the process of query planning, we simple generate 
the physical Window operator based on the logical Window operator. Then, to 
prepare the executedPlan, the EnsureRequirements rule will add Exchange and 
Sort operators if necessary. The EnsureRequirements rule will analyze the data 
properties and try to not add unnecessary shuffle and sort. The physical plan 
for the above example query is shown below.

```
== Physical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Exchange (RangePartitioning [year#34 ASC,country#35 ASC,product#36 ASC], 200), 
[]
  Window [year#34,country#35,product#36,sales#37,avg_product#27], 
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37)
 WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Exchange (HashPartitioning [country#35], 200), [country#35 ASC]
    Window [year#34,country#35,product#36,sales#37], 
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37)
 WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
     Exchange (HashPartitioning [product#36], 200), [product#36 ASC]
      HiveTableScan [year#34,country#35,product#36,sales#37], 
(MetastoreRelation default, sales, None), None
```

*       Execution time: At execution time, a physical Window operator buffers 
all rows in a partition specified in the partition spec of a OVER clause. If 
necessary, it also maintains a sliding window frame. The current implementation 
tries to buffer the input parameters of a window function according to the 
window frame to avoid evaluating a row multiple times.

# Future work #

Here are three improvements that are not hard to add:
*       Taking advantage of the window frame specification to reduce the number 
of rows buffered in the physical Window operator. For some cases, we only need 
to buffer the rows appearing in the sliding window. But for other cases, we 
will not be able to reduce the number of rows buffered (e.g. ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).

*       When aRAGEN frame is used, for <value> PRECEDING and <value> FOLLOWING, 
it will be great if the <value> part is an expression (we can start with 
Literal). So, when the data type of ORDER BY expression is a FractionalType, we 
can support FractionalType as the type <value> (<value> still needs to be 
evaluated as a positive value).

*       When aRAGEN frame is used, we need to support DateType and 
TimestampType as the data type of the expression appearing in the order 
specification. Then, the <value> part of <value> PRECEDING and <value> 
FOLLOWING can support interval types (once we support them).

This is a joint work with guowei2 and yhuai
Thanks hbutani hvanhovell for his comments
Thanks scwf for his comments and unit tests

Author: Yin Huai <yh...@databricks.com>

Closes #5604 from guowei2/windowImplement and squashes the following commits:

76fe1c8 [Yin Huai] Implementation.
aa2b0ae [Yin Huai] Tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2c47082
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2c47082
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2c47082

Branch: refs/heads/master
Commit: f2c47082c3412a4cf8cbabe12585147c5ec3ea40
Parents: c3eb441
Author: Yin Huai <yh...@databricks.com>
Authored: Wed May 6 10:43:00 2015 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed May 6 10:43:00 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  200 +++
 .../sql/catalyst/analysis/CheckAnalysis.scala   |    5 +
 .../sql/catalyst/expressions/Projection.scala   |   94 ++
 .../expressions/windowExpressions.scala         |  340 +++++
 .../catalyst/plans/logical/basicOperators.scala |   30 +-
 .../spark/sql/catalyst/trees/TreeNode.scala     |   23 +
 .../sql/catalyst/trees/TreeNodeSuite.scala      |   44 +-
 .../spark/sql/execution/SparkStrategies.scala   |    2 +
 .../org/apache/spark/sql/execution/Window.scala |  480 +++++++
 .../hive/execution/HiveCompatibilitySuite.scala |    2 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |    1 +
 .../org/apache/spark/sql/hive/HiveQl.scala      |  175 ++-
 .../org/apache/spark/sql/hive/hiveUdfs.scala    |  223 ++-
 ...Windowing-0-327a8cd39fe30255ff492ee86f660522 |   26 +
 ...RankNoGBY-0-fef4bf638d52a9a601845347010602fd |   26 +
 ...FirstLast-0-86bb9c97d92fdcd941bcb5143513e2e6 |   26 +
 ...WithWhere-0-84345a9f685ba63b87caa4bb16b122b5 |    5 +
 ...SumWindow-0-6cfc8840d3a4469b0fe11d63182cb59f |   26 +
 ...ortClause-0-88d96a526d3cae6ed8168c5b228974d1 |   26 +
 ...pressions-0-11f6c13cf2710ce7054654cca136e73e |   26 +
 ...leWindows-0-efd1476255eeb1b1961149144f574b7a |   26 +
 ...CountStar-0-1b1fc185c8fddf68e58e92f29052ab2d |   26 +
 ...testUDAFs-0-6974e5959e41a661e09db18547fef58a |   26 +
 ...FsWithGBY-0-67d15ee5915ac64a738fd4b60d75eb35 |   25 +
 ...titioning-0-cb5618b1e626f3a9d4a030b508b5d251 |   25 +
 ...testSTATs-0-da0e0cca69e42118a96b8609b8fa5838 |   26 +
 ...testDISTs-0-672d4cb385b7ced2e446f132474293ad |   26 +
 ...eralViews-0-dea06072f0a64fe4537fae854944ed5a |   78 ++
 ...QAndAlias-0-b996a664b06e5741c08079d5c38241bc |   25 +
 ...geWindows-0-227e080e337d734dd88ff814b3b412e4 |   26 +
 ...DAFInvoke-0-25912ae7d18c91cc09e17e57968fb5db |   26 +
 ...rInWdwDef-0-88945892370ccbc1125a927a3d55342a |   26 +
 ...ngWithSWQ-0-a5a5339330a6a6660d32ccb0cc5d7100 |   25 +
 ...SpecRules-0-fa80b09c99e3c1487de48ea71a88dada |   26 +
 ...titioning-0-45ccbaf0ee083858f7661c66b11d4768 |   26 +
 ...iousForms-0-3436e50214f9afdec84334e10faa931a |   26 +
 ...ousForms2-0-cba9d84a6b1bb5e36595338d4602377e |   26 +
 ...OrderCols-0-7647562850dd367ef1e6c63117805423 |   26 +
 ...testCount-0-e6e97e884327df86f16b870527ec026c |   26 +
 ...enForRows-0-99007f45b6406869e048b0e4eb9213f1 |   26 +
 ...nForRange-0-d81a591e90950de291d2f133793e9283 |   26 +
 ...ngForRows-0-fb8648e82e4dd56d6bdcfd739dd1edf0 |   26 +
 ...gForRange-0-3cd04e5f2398853c4850f4f86142bb39 |   26 +
 ...Aggregate-0-cb3d2f8c1296044dc2658876bb6103ae |   26 +
 ...owingUDAF-0-3bde93728761b780a745c2ce0398aa0f |   26 +
 ...untInSubQ-0-73d5274a21d4f4fd51d2a0f1d98516ce |   26 +
 ...CaseAlias-0-4b1ad2515fb079012467e987f484a722 |   26 +
 ...wingNoGBY-0-70cdc0555a61ef08534a9ebebb95ebbf |   26 +
 ...tainer_sz-0-d3f50875bd5dff172cf813fdb7d738eb |    0
 ...tainer_sz-1-dda16565b98926fc3587de937b9401c7 |    0
 ...ntainer_sz-2-374e39786feb745cd70f25be58bfa24 |    0
 ...tainer_sz-3-d2b5e23edec42a62e61750b110ecbaac |    1 +
 ...tainer_sz-4-50d0c630159068b5b8ccdeb76493f1f7 |   26 +
 ...tainer_sz-5-3f95cd6f4add7a2d0101fe3dd97e5082 |    1 +
 ...mnPruning-0-d3f50875bd5dff172cf813fdb7d738eb |    0
 ...mnPruning-1-dda16565b98926fc3587de937b9401c7 |    0
 ...umnPruning-2-374e39786feb745cd70f25be58bfa24 |    0
 ...mnPruning-3-9294b4a22bc396ff2accabd53c5da98b |   26 +
 ...umnPruning-4-445cab062581c449ceffcb368cdf133 |   26 +
 ...mnPruning-5-89110070c761eafb992eb9315128b53f |   26 +
 ...nistic) 1-0-12a92d8800e0da8b515ba3eaf6a7fd0f | 1049 ++++++++++++++
 ...nistic) 3-0-455e41d9949a2d22bab634fd8e42f2b1 |    1 +
 ...nistic) 4-0-cfad06ae8eba6b047d32a6a61dd59392 |    1 +
 ...nistic) 5-0-d7ca7a61377cef3a9f721a28afdae012 |    1 +
 ...nistic) 6-0-287bcc7679822bc7b684532b267bf11f |    1 +
 ...ministic)-0-36217f6074daaacddb9fcb50a3f4fb5b | 1049 ++++++++++++++
 ...ministic)-1-9ee79e711248dd6e0a6ce27e439e55f4 | 1049 ++++++++++++++
 ...ministic)-2-1e88e0ba414a00195f7ebf6b8600ac04 | 1049 ++++++++++++++
 ...ministic)-3-34d9ee4120f21d0d0ae914fba0acc60c | 1049 ++++++++++++++
 ...ministic)-4-dfd39236756a3951bc1ec354799d69e4 | 1049 ++++++++++++++
 ...ministic)-5-8d0ee3e1605f38214bfad28a5ce897cc |    1 +
 ...ministic)-0-b7cb25303831392a51cd996e758ac79a | 1049 ++++++++++++++
 ...ministic)-1-a3d352560ac835993001665db6954965 | 1049 ++++++++++++++
 ...ministic)-2-fafa16c0f7697ca28aeb6f2698799562 | 1049 ++++++++++++++
 ...ministic)-3-bda0e7c77d6f4712a03389cb5032bc6d | 1049 ++++++++++++++
 ...nistic) 1-0-2e0cbc2d7c5f16657edacd9e7209e6e7 | 1049 ++++++++++++++
 ...nistic) 1-1-5c5f373e325115d710a7a23fcb1626f1 | 1049 ++++++++++++++
 ...nistic) 1-2-ac487cc1b94130bf9ce00e07c7075f65 | 1049 ++++++++++++++
 ...nistic) 1-3-b82dfa24123047be4b4e3c27c3997d34 | 1049 ++++++++++++++
 ...inistic) 2-0-81bb7f49a55385878637c8aac4d08e5 | 1294 ++++++++++++++++++
 ...nistic) 3-0-58a982694ba2b1e34de82b1de54936a0 |    0
 ...nistic) 4-0-12cc78f3953c3e6b5411ddc729541bf0 |  474 +++++++
 ...ministic)-0-6642a21d87e0401ba1a668ea8b244f0c | 1049 ++++++++++++++
 ...ministic)-1-2bf20f39e6ffef258858f7943a974e7e | 1049 ++++++++++++++
 ...ministic)-2-16239d2b069789ba99fbac50c4f0724f | 1049 ++++++++++++++
 ...ministic)-3-d90b27fca067b0b3c48d873b3ef32af7 | 1049 ++++++++++++++
 ...ministic)-4-f2e4d659b65a833e9281b6786d3d55c1 | 1049 ++++++++++++++
 ...ing_udaf2-0-96659fde37d7a38ea15b367b47f59ce2 |    0
 ...ing_udaf2-1-b4bdee4908b1cb8e240c549ae5cfe4c0 |    1 +
 ...ministic)-0-f498cccf82480be03022d2a36f87651e | 1049 ++++++++++++++
 ...rministic)-1-6378faf36ffd3f61e61cee6c0cb70e6 | 1049 ++++++++++++++
 ...ministic)-2-5f0eab306ea3c22b11ace9b542a7ee56 | 1049 ++++++++++++++
 ...ministic)-3-6f104992e0050576085064815de43194 | 1049 ++++++++++++++
 ...ministic)-4-cd2e3d2344810cb3ba843d4c01c81d7e | 1049 ++++++++++++++
 ...ministic)-5-ee44c5cdc80e1c832b702f9fb76d8145 | 1049 ++++++++++++++
 ...ministic)-6-4d78f7b1d172d20c91f5867bc13a42a0 | 1049 ++++++++++++++
 ...ministic)-7-20fdc99aa046b2c41d9b85ab338c749c | 1049 ++++++++++++++
 ...ministic)-8-45a1d7c2aba45d761e19ff4dfdf5463e | 1049 ++++++++++++++
 .../sql/hive/execution/HiveComparisonTest.scala |    2 +-
 .../HiveWindowFunctionQuerySuite.scala          |  845 ++++++++++++
 .../sql/hive/execution/SQLQuerySuite.scala      |  147 ++
 101 files changed, 34768 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 5e42b40..7b543b6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.util.collection.OpenHashSet
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions._
@@ -61,6 +63,7 @@ class Analyzer(
       ResolveGenerate ::
       ImplicitGenerate ::
       ResolveFunctions ::
+      ExtractWindowExpressions ::
       GlobalAggregates ::
       UnresolvedHavingClauseAttributes ::
       TrimGroupingAliases ::
@@ -529,6 +532,203 @@ class Analyzer(
           makeGeneratorOutput(p.generator, p.generatorOutput), p.child)
     }
   }
+
+  /**
+   * Extracts [[WindowExpression]]s from the projectList of a [[Project]] 
operator and
+   * aggregateExpressions of an [[Aggregate]] operator and creates individual 
[[Window]]
+   * operators for every distinct [[WindowSpecDefinition]].
+   *
+   * This rule handles three cases:
+   *  - A [[Project]] having [[WindowExpression]]s in its projectList;
+   *  - An [[Aggregate]] having [[WindowExpression]]s in its 
aggregateExpressions.
+   *  - An [[Filter]]->[[Aggregate]] pattern representing GROUP BY with a 
HAVING
+   *    clause and the [[Aggregate]] has [[WindowExpression]]s in its 
aggregateExpressions.
+   * Note: If there is a GROUP BY clause in the query, aggregations and 
corresponding
+   * filters (expressions in the HAVING clause) should be evaluated before any
+   * [[WindowExpression]]. If a query has SELECT DISTINCT, the DISTINCT part 
should be
+   * evaluated after all [[WindowExpression]]s.
+   *
+   * For every case, the transformation works as follows:
+   * 1. For a list of [[Expression]]s (a projectList or an 
aggregateExpressions), partitions
+   *    it two lists of [[Expression]]s, one for all [[WindowExpression]]s and 
another for
+   *    all regular expressions.
+   * 2. For all [[WindowExpression]]s, groups them based on their 
[[WindowSpecDefinition]]s.
+   * 3. For every distinct [[WindowSpecDefinition]], creates a [[Window]] 
operator and inserts
+   *    it into the plan tree.
+   */
+  object ExtractWindowExpressions extends Rule[LogicalPlan] {
+    def hasWindowFunction(projectList: Seq[NamedExpression]): Boolean =
+      projectList.exists(hasWindowFunction)
+
+    def hasWindowFunction(expr: NamedExpression): Boolean = {
+      expr.find {
+        case window: WindowExpression => true
+        case _ => false
+      }.isDefined
+    }
+
+    /**
+     * From a Seq of [[NamedExpression]]s, extract window expressions and
+     * other regular expressions.
+     */
+    def extract(
+        expressions: Seq[NamedExpression]): (Seq[NamedExpression], 
Seq[NamedExpression]) = {
+      // First, we simple partition the input expressions to two part, one 
having
+      // WindowExpressions and another one without WindowExpressions.
+      val (windowExpressions, regularExpressions) = 
expressions.partition(hasWindowFunction)
+
+      // Then, we need to extract those regular expressions used in the 
WindowExpression.
+      // For example, when we have col1 - Sum(col2 + col3) OVER (PARTITION BY 
col4 ORDER BY col5),
+      // we need to make sure that col1 to col5 are all projected from the 
child of the Window
+      // operator.
+      val extractedExprBuffer = new ArrayBuffer[NamedExpression]()
+      def extractExpr(expr: Expression): Expression = expr match {
+        case ne: NamedExpression =>
+          // If a named expression is not in regularExpressions, add extract 
it and replace it
+          // with an AttributeReference.
+          val missingExpr =
+            AttributeSet(Seq(expr)) -- (regularExpressions ++ 
extractedExprBuffer)
+          if (missingExpr.nonEmpty) {
+            extractedExprBuffer += ne
+          }
+          ne.toAttribute
+        case e: Expression if e.foldable =>
+          e // No need to create an attribute reference if it will be 
evaluated as a Literal.
+        case e: Expression =>
+          // For other expressions, we extract it and replace it with an 
AttributeReference (with
+          // an interal column name, e.g. "_w0").
+          val withName = Alias(e, s"_w${extractedExprBuffer.length}")()
+          extractedExprBuffer += withName
+          withName.toAttribute
+      }
+
+      // Now, we extract expressions from windowExpressions by using 
extractExpr.
+      val newWindowExpressions = windowExpressions.map {
+        _.transform {
+          // Extracts children expressions of a WindowFunction (input 
parameters of
+          // a WindowFunction).
+          case wf : WindowFunction =>
+            val newChildren = wf.children.map(extractExpr(_))
+            wf.withNewChildren(newChildren)
+
+          // Extracts expressions from the partition spec and order spec.
+          case wsc @ WindowSpecDefinition(partitionSpec, orderSpec, _) =>
+            val newPartitionSpec = partitionSpec.map(extractExpr(_))
+            val newOrderSpec = orderSpec.map { so =>
+              val newChild = extractExpr(so.child)
+              so.copy(child = newChild)
+            }
+            wsc.copy(partitionSpec = newPartitionSpec, orderSpec = 
newOrderSpec)
+
+          // Extracts AggregateExpression. For example, for SUM(x) - Sum(y) 
OVER (...),
+          // we need to extract SUM(x).
+          case agg: AggregateExpression =>
+            val withName = Alias(agg, s"_w${extractedExprBuffer.length}")()
+            extractedExprBuffer += withName
+            withName.toAttribute
+        }.asInstanceOf[NamedExpression]
+      }
+
+      (newWindowExpressions, regularExpressions ++ extractedExprBuffer)
+    }
+
+    /**
+     * Adds operators for Window Expressions. Every Window operator handles a 
single Window Spec.
+     */
+    def addWindow(windowExpressions: Seq[NamedExpression], child: 
LogicalPlan): LogicalPlan = {
+      // First, we group window expressions based on their Window Spec.
+      val groupedWindowExpression = windowExpressions.groupBy { expr =>
+        val windowExpression = expr.find {
+          case window: WindowExpression => true
+          case other => false
+        }.map(_.asInstanceOf[WindowExpression].windowSpec)
+        windowExpression.getOrElse(
+          failAnalysis(s"$windowExpressions does not have any 
WindowExpression."))
+      }.toSeq
+
+      // For every Window Spec, we add a Window operator and set currentChild 
as the child of it.
+      var currentChild = child
+      var i = 0
+      while (i < groupedWindowExpression.size) {
+        val (windowSpec, windowExpressions) = groupedWindowExpression(i)
+        // Set currentChild to the newly created Window operator.
+        currentChild = Window(currentChild.output, windowExpressions, 
windowSpec, currentChild)
+
+        // Move to next WindowExpression.
+        i += 1
+      }
+
+      // We return the top operator.
+      currentChild
+    }
+
+    // We have to use transformDown at here to make sure the rule of
+    // "Aggregate with Having clause" will be triggered.
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+      // Lookup WindowSpecDefinitions. This rule works with unresolved 
children.
+      case WithWindowDefinition(windowDefinitions, child) =>
+        child.transform {
+          case plan => plan.transformExpressions {
+            case UnresolvedWindowExpression(c, 
WindowSpecReference(windowName)) =>
+              val errorMessage =
+                s"Window specification $windowName is not defined in the 
WINDOW clause."
+              val windowSpecDefinition =
+                windowDefinitions
+                  .get(windowName)
+                  .getOrElse(failAnalysis(errorMessage))
+              WindowExpression(c, windowSpecDefinition)
+          }
+        }
+
+      // Aggregate with Having clause. This rule works with an unresolved 
Aggregate because
+      // a resolved Aggregate will not have Window Functions.
+      case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, 
child))
+        if child.resolved &&
+           hasWindowFunction(aggregateExprs) &&
+           !a.expressions.exists(!_.resolved) =>
+        val (windowExpressions, aggregateExpressions) = extract(aggregateExprs)
+        // Create an Aggregate operator to evaluate aggregation functions.
+        val withAggregate = Aggregate(groupingExprs, aggregateExpressions, 
child)
+        // Add a Filter operator for conditions in the Having clause.
+        val withFilter = Filter(condition, withAggregate)
+        val withWindow = addWindow(windowExpressions, withFilter)
+
+        // Finally, generate output columns according to the original 
projectList.
+        val finalProjectList = aggregateExprs.map (_.toAttribute)
+        Project(finalProjectList, withWindow)
+
+      case p: LogicalPlan if !p.childrenResolved => p
+
+      // Aggregate without Having clause.
+      case a @ Aggregate(groupingExprs, aggregateExprs, child)
+        if hasWindowFunction(aggregateExprs) &&
+           !a.expressions.exists(!_.resolved) =>
+        val (windowExpressions, aggregateExpressions) = extract(aggregateExprs)
+        // Create an Aggregate operator to evaluate aggregation functions.
+        val withAggregate = Aggregate(groupingExprs, aggregateExpressions, 
child)
+        // Add Window operators.
+        val withWindow = addWindow(windowExpressions, withAggregate)
+
+        // Finally, generate output columns according to the original 
projectList.
+        val finalProjectList = aggregateExprs.map (_.toAttribute)
+        Project(finalProjectList, withWindow)
+
+      // We only extract Window Expressions after all expressions of the 
Project
+      // have been resolved.
+      case p @ Project(projectList, child)
+        if hasWindowFunction(projectList) && 
!p.expressions.exists(!_.resolved) =>
+        val (windowExpressions, regularExpressions) = extract(projectList)
+        // We add a project to get all needed expressions for window 
expressions from the child
+        // of the original Project operator.
+        val withProject = Project(regularExpressions, child)
+        // Add Window operators.
+        val withWindow = addWindow(windowExpressions, withProject)
+
+        // Finally, generate output columns according to the original 
projectList.
+        val finalProjectList = projectList.map (_.toAttribute)
+        Project(finalProjectList, withWindow)
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 2381689..c8288c6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -70,6 +70,11 @@ trait CheckAnalysis {
             failAnalysis(
               s"invalid expression ${b.prettyString} " +
                 s"between ${b.left.simpleString} and ${b.right.simpleString}")
+
+          case w @ WindowExpression(windowFunction, windowSpec) if 
windowSpec.validate.nonEmpty =>
+            // The window spec is not valid.
+            val reason = windowSpec.validate.get
+            failAnalysis(s"Window specification $windowSpec is not valid 
because $reason")
         }
 
         operator match {

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index c2866cd..8cae548 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -548,3 +548,97 @@ class JoinedRow5 extends Row {
     }
   }
 }
+
+/**
+ * JIT HACK: Replace with macros
+ */
+class JoinedRow6 extends Row {
+  private[this] var row1: Row = _
+  private[this] var row2: Row = _
+
+  def this(left: Row, right: Row) = {
+    this()
+    row1 = left
+    row2 = right
+  }
+
+  /** Updates this JoinedRow to used point at two new base rows.  Returns 
itself. */
+  def apply(r1: Row, r2: Row): Row = {
+    row1 = r1
+    row2 = r2
+    this
+  }
+
+  /** Updates this JoinedRow by updating its left base row.  Returns itself. */
+  def withLeft(newLeft: Row): Row = {
+    row1 = newLeft
+    this
+  }
+
+  /** Updates this JoinedRow by updating its right base row.  Returns itself. 
*/
+  def withRight(newRight: Row): Row = {
+    row2 = newRight
+    this
+  }
+
+  override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
+
+  override def length: Int = row1.length + row2.length
+
+  override def apply(i: Int): Any =
+    if (i < row1.length) row1(i) else row2(i - row1.length)
+
+  override def isNullAt(i: Int): Boolean =
+    if (i < row1.length) row1.isNullAt(i) else row2.isNullAt(i - row1.length)
+
+  override def getInt(i: Int): Int =
+    if (i < row1.length) row1.getInt(i) else row2.getInt(i - row1.length)
+
+  override def getLong(i: Int): Long =
+    if (i < row1.length) row1.getLong(i) else row2.getLong(i - row1.length)
+
+  override def getDouble(i: Int): Double =
+    if (i < row1.length) row1.getDouble(i) else row2.getDouble(i - row1.length)
+
+  override def getBoolean(i: Int): Boolean =
+    if (i < row1.length) row1.getBoolean(i) else row2.getBoolean(i - 
row1.length)
+
+  override def getShort(i: Int): Short =
+    if (i < row1.length) row1.getShort(i) else row2.getShort(i - row1.length)
+
+  override def getByte(i: Int): Byte =
+    if (i < row1.length) row1.getByte(i) else row2.getByte(i - row1.length)
+
+  override def getFloat(i: Int): Float =
+    if (i < row1.length) row1.getFloat(i) else row2.getFloat(i - row1.length)
+
+  override def getString(i: Int): String =
+    if (i < row1.length) row1.getString(i) else row2.getString(i - row1.length)
+
+  override def getAs[T](i: Int): T =
+    if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
+
+  override def copy(): Row = {
+    val totalSize = row1.length + row2.length
+    val copiedValues = new Array[Any](totalSize)
+    var i = 0
+    while(i < totalSize) {
+      copiedValues(i) = apply(i)
+      i += 1
+    }
+    new GenericRow(copiedValues)
+  }
+
+  override def toString: String = {
+    // Make sure toString never throws NullPointerException.
+    if ((row1 eq null) && (row2 eq null)) {
+      "[ empty row ]"
+    } else if (row1 eq null) {
+      row2.mkString("[", ",", "]")
+    } else if (row2 eq null) {
+      row1.mkString("[", ",", "]")
+    } else {
+      mkString("[", ",", "]")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
new file mode 100644
index 0000000..099d67c
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{NumericType, DataType}
+
+/**
+ * The trait of the Window Specification (specified in the OVER clause or 
WINDOW clause) for
+ * Window Functions.
+ */
+sealed trait WindowSpec
+
+/**
+ * The specification for a window function.
+ * @param partitionSpec It defines the way that input rows are partitioned.
+ * @param orderSpec It defines the ordering of rows in a partition.
+ * @param frameSpecification It defines the window frame in a partition.
+ */
+case class WindowSpecDefinition(
+    partitionSpec: Seq[Expression],
+    orderSpec: Seq[SortOrder],
+    frameSpecification: WindowFrame) extends Expression with WindowSpec {
+
+  def validate: Option[String] = frameSpecification match {
+    case UnspecifiedFrame =>
+      Some("Found a UnspecifiedFrame. It should be converted to a 
SpecifiedWindowFrame " +
+        "during analysis. Please file a bug report.")
+    case frame: SpecifiedWindowFrame => frame.validate.orElse {
+      def checkValueBasedBoundaryForRangeFrame(): Option[String] = {
+        if (orderSpec.length > 1)  {
+          // It is not allowed to have a value-based PRECEDING and FOLLOWING
+          // as the boundary of a Range Window Frame.
+          Some("This Range Window Frame only accepts at most one ORDER BY 
expression.")
+        } else if (orderSpec.nonEmpty && 
!orderSpec.head.dataType.isInstanceOf[NumericType]) {
+          Some("The data type of the expression in the ORDER BY clause should 
be a numeric type.")
+        } else {
+          None
+        }
+      }
+
+      (frame.frameType, frame.frameStart, frame.frameEnd) match {
+        case (RangeFrame, vp: ValuePreceding, _) => 
checkValueBasedBoundaryForRangeFrame()
+        case (RangeFrame, vf: ValueFollowing, _) => 
checkValueBasedBoundaryForRangeFrame()
+        case (RangeFrame, _, vp: ValuePreceding) => 
checkValueBasedBoundaryForRangeFrame()
+        case (RangeFrame, _, vf: ValueFollowing) => 
checkValueBasedBoundaryForRangeFrame()
+        case (_, _, _) => None
+      }
+    }
+  }
+
+  type EvaluatedType = Any
+
+  override def children: Seq[Expression]  = partitionSpec ++ orderSpec
+
+  override lazy val resolved: Boolean =
+    childrenResolved && frameSpecification.isInstanceOf[SpecifiedWindowFrame]
+
+
+  override def toString: String = simpleString
+
+  override def eval(input: Row): EvaluatedType = throw new 
UnsupportedOperationException
+  override def nullable: Boolean = true
+  override def foldable: Boolean = false
+  override def dataType: DataType = throw new UnsupportedOperationException
+}
+
+/**
+ * A Window specification reference that refers to the 
[[WindowSpecDefinition]] defined
+ * under the name `name`.
+ */
+case class WindowSpecReference(name: String) extends WindowSpec
+
+/**
+ * The trait used to represent the type of a Window Frame.
+ */
+sealed trait FrameType
+
+/**
+ * RowFrame treats rows in a partition individually. When a [[ValuePreceding]]
+ * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is 
considered
+ * as a physical offset.
+ * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 3-row 
frame,
+ * from the row precedes the current row to the row follows the current row.
+ */
+case object RowFrame extends FrameType
+
+/**
+ * RangeFrame treats rows in a partition as groups of peers.
+ * All rows having the same `ORDER BY` ordering are considered as peers.
+ * When a [[ValuePreceding]] or a [[ValueFollowing]] is used as its 
[[FrameBoundary]],
+ * the value is considered as a logical offset.
+ * For example, assuming the value of the current row's `ORDER BY` expression 
`expr` is `v`,
+ * `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a frame containing 
rows whose values
+ * `expr` are in the range of [v-1, v+1].
+ *
+ * If `ORDER BY` clause is not defined, all rows in the partition is 
considered as peers
+ * of the current row.
+ */
+case object RangeFrame extends FrameType
+
+/**
+ * The trait used to represent the type of a Window Frame Boundary.
+ */
+sealed trait FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean
+}
+
+/** UNBOUNDED PRECEDING boundary. */
+case object UnboundedPreceding extends FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean = other match {
+    case UnboundedPreceding => true
+    case vp: ValuePreceding => true
+    case CurrentRow => true
+    case vf: ValueFollowing => true
+    case UnboundedFollowing => true
+  }
+
+  override def toString: String = "UNBOUNDED PRECEDING"
+}
+
+/** <value> PRECEDING boundary. */
+case class ValuePreceding(value: Int) extends FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean = other match {
+    case UnboundedPreceding => false
+    case ValuePreceding(anotherValue) => value >= anotherValue
+    case CurrentRow => true
+    case vf: ValueFollowing => true
+    case UnboundedFollowing => true
+  }
+
+  override def toString: String = s"$value PRECEDING"
+}
+
+/** CURRENT ROW boundary. */
+case object CurrentRow extends FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean = other match {
+    case UnboundedPreceding => false
+    case vp: ValuePreceding => false
+    case CurrentRow => true
+    case vf: ValueFollowing => true
+    case UnboundedFollowing => true
+  }
+
+  override def toString: String = "CURRENT ROW"
+}
+
+/** <value> FOLLOWING boundary. */
+case class ValueFollowing(value: Int) extends FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean = other match {
+    case UnboundedPreceding => false
+    case vp: ValuePreceding => false
+    case CurrentRow => false
+    case ValueFollowing(anotherValue) => value <= anotherValue
+    case UnboundedFollowing => true
+  }
+
+  override def toString: String = s"$value FOLLOWING"
+}
+
+/** UNBOUNDED FOLLOWING boundary. */
+case object UnboundedFollowing extends FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean = other match {
+    case UnboundedPreceding => false
+    case vp: ValuePreceding => false
+    case CurrentRow => false
+    case vf: ValueFollowing => false
+    case UnboundedFollowing => true
+  }
+
+  override def toString: String = "UNBOUNDED FOLLOWING"
+}
+
+/**
+ * The trait used to represent the a Window Frame.
+ */
+sealed trait WindowFrame
+
+/** Used as a place holder when a frame specification is not defined.  */
+case object UnspecifiedFrame extends WindowFrame
+
+/** A specified Window Frame. */
+case class SpecifiedWindowFrame(
+    frameType: FrameType,
+    frameStart: FrameBoundary,
+    frameEnd: FrameBoundary) extends WindowFrame {
+
+  /** If this WindowFrame is valid or not. */
+  def validate: Option[String] = (frameType, frameStart, frameEnd) match {
+    case (_, UnboundedFollowing, _) =>
+      Some(s"$UnboundedFollowing is not allowed as the start of a Window 
Frame.")
+    case (_, _, UnboundedPreceding) =>
+      Some(s"$UnboundedPreceding is not allowed as the end of a Window Frame.")
+    // case (RowFrame, start, end) => ??? RowFrame specific rule
+    // case (RangeFrame, start, end) => ??? RangeFrame specific rule
+    case (_, start, end) =>
+      if (start.notFollows(end)) {
+        None
+      } else {
+        val reason =
+          s"The end of this Window Frame $end is smaller than the start of " +
+          s"this Window Frame $start."
+        Some(reason)
+      }
+  }
+
+  override def toString: String = frameType match {
+    case RowFrame => s"ROWS BETWEEN $frameStart AND $frameEnd"
+    case RangeFrame => s"RANGE BETWEEN $frameStart AND $frameEnd"
+  }
+}
+
+object SpecifiedWindowFrame {
+  /**
+   *
+   * @param hasOrderSpecification If the window spec has order by expressions.
+   * @param acceptWindowFrame If the window function accepts user-specified 
frame.
+   * @return
+   */
+  def defaultWindowFrame(
+      hasOrderSpecification: Boolean,
+      acceptWindowFrame: Boolean): SpecifiedWindowFrame = {
+    if (hasOrderSpecification && acceptWindowFrame) {
+      // If order spec is defined and the window function supports user 
specified window frames,
+      // the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW.
+      SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)
+    } else {
+      // Otherwise, the default frame is
+      // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
+      SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)
+    }
+  }
+}
+
+/**
+ * Every window function needs to maintain a output buffer for its output.
+ * It should expect that for a n-row window frame, it will be called n times
+ * to retrieve value corresponding with these n rows.
+ */
+trait WindowFunction extends Expression {
+  self: Product =>
+
+  def init(): Unit
+
+  def reset(): Unit
+
+  def prepareInputParameters(input: Row): AnyRef
+
+  def update(input: AnyRef): Unit
+
+  def batchUpdate(inputs: Array[AnyRef]): Unit
+
+  def evaluate(): Unit
+
+  def get(index: Int): Any
+
+  def newInstance(): WindowFunction
+}
+
+case class UnresolvedWindowFunction(
+    name: String,
+    children: Seq[Expression])
+  extends Expression with WindowFunction {
+
+  override def dataType: DataType = throw new UnresolvedException(this, 
"dataType")
+  override def foldable: Boolean = throw new UnresolvedException(this, 
"foldable")
+  override def nullable: Boolean = throw new UnresolvedException(this, 
"nullable")
+  override lazy val resolved = false
+
+  override def init(): Unit =
+    throw new UnresolvedException(this, "init")
+  override def reset(): Unit =
+    throw new UnresolvedException(this, "reset")
+  override def prepareInputParameters(input: Row): AnyRef =
+    throw new UnresolvedException(this, "prepareInputParameters")
+  override def update(input: AnyRef): Unit =
+    throw new UnresolvedException(this, "update")
+  override def batchUpdate(inputs: Array[AnyRef]): Unit =
+    throw new UnresolvedException(this, "batchUpdate")
+  override def evaluate(): Unit =
+    throw new UnresolvedException(this, "evaluate")
+  override def get(index: Int): Any =
+    throw new UnresolvedException(this, "get")
+  // Unresolved functions are transient at compile time and don't get 
evaluated during execution.
+  override def eval(input: Row = null): EvaluatedType =
+    throw new TreeNodeException(this, s"No function to evaluate expression. 
type: ${this.nodeName}")
+
+  override def toString: String = s"'$name(${children.mkString(",")})"
+
+  override def newInstance(): WindowFunction =
+    throw new UnresolvedException(this, "newInstance")
+}
+
+case class UnresolvedWindowExpression(
+    child: UnresolvedWindowFunction,
+    windowSpec: WindowSpecReference) extends UnaryExpression {
+  override def dataType: DataType = throw new UnresolvedException(this, 
"dataType")
+  override def foldable: Boolean = throw new UnresolvedException(this, 
"foldable")
+  override def nullable: Boolean = throw new UnresolvedException(this, 
"nullable")
+  override lazy val resolved = false
+
+  // Unresolved functions are transient at compile time and don't get 
evaluated during execution.
+  override def eval(input: Row = null): EvaluatedType =
+    throw new TreeNodeException(this, s"No function to evaluate expression. 
type: ${this.nodeName}")
+}
+
+case class WindowExpression(
+    windowFunction: WindowFunction,
+    windowSpec: WindowSpecDefinition) extends Expression {
+  override type EvaluatedType = Any
+
+  override def children: Seq[Expression] =
+    windowFunction :: windowSpec :: Nil
+
+  override def eval(input: Row): EvaluatedType =
+    throw new TreeNodeException(this, s"No function to evaluate expression. 
type: ${this.nodeName}")
+
+  override def dataType: DataType = windowFunction.dataType
+  override def foldable: Boolean = windowFunction.foldable
+  override def nullable: Boolean = windowFunction.nullable
+
+  override def toString: String = s"$windowFunction $windowSpec"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 21208c8..ba0abb2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -25,13 +25,14 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
   override lazy val resolved: Boolean = {
-    val containsAggregatesOrGenerators = projectList.exists ( _.collect {
+    val hasSpecialExpressions = projectList.exists ( _.collect {
         case agg: AggregateExpression => agg
         case generator: Generator => generator
+        case window: WindowExpression => window
       }.nonEmpty
     )
 
-    !expressions.exists(!_.resolved) && childrenResolved && 
!containsAggregatesOrGenerators
+    !expressions.exists(!_.resolved) && childrenResolved && 
!hasSpecialExpressions
   }
 }
 
@@ -170,6 +171,12 @@ case class With(child: LogicalPlan, cteRelations: 
Map[String, Subquery]) extends
   override def output: Seq[Attribute] = child.output
 }
 
+case class WithWindowDefinition(
+    windowDefinitions: Map[String, WindowSpecDefinition],
+    child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
+
 case class WriteToFile(
     path: String,
     child: LogicalPlan) extends UnaryNode {
@@ -195,9 +202,28 @@ case class Aggregate(
     child: LogicalPlan)
   extends UnaryNode {
 
+  override lazy val resolved: Boolean = {
+    val hasWindowExpressions = aggregateExpressions.exists ( _.collect {
+        case window: WindowExpression => window
+      }.nonEmpty
+    )
+
+    !expressions.exists(!_.resolved) && childrenResolved && 
!hasWindowExpressions
+  }
+
   override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
 }
 
+case class Window(
+    projectList: Seq[Attribute],
+    windowExpressions: Seq[NamedExpression],
+    windowSpec: WindowSpecDefinition,
+    child: LogicalPlan) extends UnaryNode {
+
+  override def output: Seq[Attribute] =
+    (projectList ++ windowExpressions).map(_.toAttribute)
+}
+
 /**
  * Apply the all of the GroupExpressions to every input row, hence we will get
  * multiple output rows for a input row.

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 97502ed..4b93f7d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -72,6 +72,15 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
   }
 
   /**
+   * Find the first [[TreeNode]] that satisfies the condition specified by `f`.
+   * The condition is recursively applied to this node and all of its children 
(pre-order).
+   */
+  def find(f: BaseType => Boolean): Option[BaseType] = f(this) match {
+    case true => Some(this)
+    case false => children.foldLeft(None: Option[BaseType]) { (l, r) => 
l.orElse(r.find(f)) }
+  }
+
+  /**
    * Runs the given function on this node and then recursively on [[children]].
    * @param f the function to be applied to each node in the tree.
    */
@@ -151,6 +160,20 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
     val remainingNewChildren = newChildren.toBuffer
     val remainingOldChildren = children.toBuffer
     val newArgs = productIterator.map {
+      // This rule is used to handle children is a input argument.
+      case s: Seq[_] => s.map {
+        case arg: TreeNode[_] if children contains arg =>
+          val newChild = remainingNewChildren.remove(0)
+          val oldChild = remainingOldChildren.remove(0)
+          if (newChild fastEquals oldChild) {
+            oldChild
+          } else {
+            changed = true
+            newChild
+          }
+        case nonChild: AnyRef => nonChild
+        case null => null
+      }
       case arg: TreeNode[_] if children contains arg =>
         val newChild = remainingNewChildren.remove(0)
         val oldChild = remainingOldChildren.remove(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index 6b39332..786ddba 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.scalatest.FunSuite
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{StringType, NullType}
+import org.apache.spark.sql.types.{IntegerType, StringType, NullType}
 
 case class Dummy(optKey: Option[Expression]) extends Expression {
   def children: Seq[Expression] = optKey.toSeq
@@ -129,5 +129,47 @@ class TreeNodeSuite extends FunSuite {
     assert(expected === actual)
   }
 
+  test("find") {
+    val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), 
Literal(4))))
+    // Find the top node.
+    var actual: Option[Expression] = expression.find {
+      case add: Add => true
+      case other => false
+    }
+    var expected: Option[Expression] =
+      Some(Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), 
Literal(4)))))
+    assert(expected === actual)
+
+    // Find the first children.
+    actual = expression.find {
+      case Literal(1, IntegerType) => true
+      case other => false
+    }
+    expected = Some(Literal(1))
+    assert(expected === actual)
 
+    // Find an internal node (Subtract).
+    actual = expression.find {
+      case sub: Subtract => true
+      case other => false
+    }
+    expected = Some(Subtract(Literal(3), Literal(4)))
+    assert(expected === actual)
+
+    // Find a leaf node.
+    actual = expression.find {
+      case Literal(3, IntegerType) => true
+      case other => false
+    }
+    expected = Some(Literal(3))
+    assert(expected === actual)
+
+    // Find nothing.
+    actual = expression.find {
+      case Literal(100, IntegerType) => true
+      case other => false
+    }
+    expected = None
+    assert(expected === actual)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 326e8ce..56a4689 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -303,6 +303,8 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         execution.Expand(projections, output, planLater(child)) :: Nil
       case logical.Aggregate(group, agg, child) =>
         execution.Aggregate(partial = false, group, agg, planLater(child)) :: 
Nil
+      case logical.Window(projectList, windowExpressions, spec, child) =>
+        execution.Window(projectList, windowExpressions, spec, 
planLater(child)) :: Nil
       case logical.Sample(lb, ub, withReplacement, seed, child) =>
         execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: 
Nil
       case logical.LocalRelation(output, data) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
new file mode 100644
index 0000000..217b559
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, 
ClusteredDistribution, Partitioning}
+import org.apache.spark.util.collection.CompactBuffer
+
+/**
+ * :: DeveloperApi ::
+ * For every row, evaluates `windowExpression` containing Window Functions and 
attaches
+ * the results with other regular expressions (presented by `projectList`).
+ * Evert operator handles a single Window Specification, `windowSpec`.
+ */
+case class Window(
+    projectList: Seq[Attribute],
+    windowExpression: Seq[NamedExpression],
+    windowSpec: WindowSpecDefinition,
+    child: SparkPlan)
+  extends UnaryNode {
+
+  override def output: Seq[Attribute] =
+    (projectList ++ windowExpression).map(_.toAttribute)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+    if (windowSpec.partitionSpec.isEmpty) {
+      // This operator will be very expensive.
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(windowSpec.partitionSpec) :: Nil
+    }
+
+  // Since window functions are adding columns to the input rows, the child's 
outputPartitioning
+  // is preserved.
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    // The required child ordering has two parts.
+    // The first part is the expressions in the partition specification.
+    // We add these expressions to the required ordering to make sure input 
rows are grouped
+    // based on the partition specification. So, we only need to process a 
single partition
+    // at a time.
+    // The second part is the expressions specified in the ORDER BY cluase.
+    // Basically, we first use sort to group rows based on partition 
specifications and then sort
+    // Rows in a group based on the order specification.
+    (windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+  }
+
+  // Since window functions basically add columns to input rows, this operator
+  // will not change the ordering of input rows.
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  case class ComputedWindow(
+    unbound: WindowExpression,
+    windowFunction: WindowFunction,
+    resultAttribute: AttributeReference)
+
+  // A list of window functions that need to be computed for each group.
+  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
+    window.collect {
+      case w: WindowExpression =>
+        ComputedWindow(
+          w,
+          BindReferences.bindReference(w.windowFunction, child.output),
+          AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+    }
+  }.toArray
+
+  private[this] val windowFrame =
+    windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+
+  // Create window functions.
+  private[this] def windowFunctions(): Array[WindowFunction] = {
+    val functions = new Array[WindowFunction](computedWindowExpressions.length)
+    var i = 0
+    while (i < computedWindowExpressions.length) {
+      functions(i) = computedWindowExpressions(i).windowFunction.newInstance()
+      functions(i).init()
+      i += 1
+    }
+    functions
+  }
+
+  // The schema of the result of all window function evaluations
+  private[this] val computedSchema = 
computedWindowExpressions.map(_.resultAttribute)
+
+  private[this] val computedResultMap =
+    computedWindowExpressions.map { w => w.unbound -> w.resultAttribute }.toMap
+
+  private[this] val windowExpressionResult = windowExpression.map { window =>
+    window.transform {
+      case w: WindowExpression if computedResultMap.contains(w) => 
computedResultMap(w)
+    }
+  }
+
+  def execute(): RDD[Row] = {
+    child.execute().mapPartitions { iter =>
+      new Iterator[Row] {
+
+        // Although input rows are grouped based on windowSpec.partitionSpec, 
we need to
+        // know when we have a new partition.
+        // This is to manually construct an ordering that can be used to 
compare rows.
+        // TODO: We may want to have a newOrdering that takes BoundReferences.
+        // So, we can take advantave of code gen.
+        private val partitionOrdering: Ordering[Row] =
+          RowOrdering.forSchema(windowSpec.partitionSpec.map(_.dataType))
+
+        // This is used to project expressions for the partition specification.
+        protected val partitionGenerator =
+          newMutableProjection(windowSpec.partitionSpec, child.output)()
+
+        // This is ued to project expressions for the order specification.
+        protected val rowOrderGenerator =
+          newMutableProjection(windowSpec.orderSpec.map(_.child), 
child.output)()
+
+        // The position of next output row in the inputRowBuffer.
+        var rowPosition: Int = 0
+        // The number of buffered rows in the inputRowBuffer (the size of the 
current partition).
+        var partitionSize: Int = 0
+        // The buffer used to buffer rows in a partition.
+        var inputRowBuffer: CompactBuffer[Row] = _
+        // The partition key of the current partition.
+        var currentPartitionKey: Row = _
+        // The partition key of next partition.
+        var nextPartitionKey: Row = _
+        // The first row of next partition.
+        var firstRowInNextPartition: Row = _
+        // Indicates if this partition is the last one in the iter.
+        var lastPartition: Boolean = false
+
+        def createBoundaryEvaluator(): () => Unit = {
+          def findPhysicalBoundary(
+              boundary: FrameBoundary): () => Int = boundary match {
+            case UnboundedPreceding => () => 0
+            case UnboundedFollowing => () => partitionSize - 1
+            case CurrentRow => () => rowPosition
+            case ValuePreceding(value) =>
+              () =>
+                val newPosition = rowPosition - value
+                if (newPosition > 0) newPosition else 0
+            case ValueFollowing(value) =>
+              () =>
+                val newPosition = rowPosition + value
+                if (newPosition < partitionSize) newPosition else 
partitionSize - 1
+          }
+
+          def findLogicalBoundary(
+              boundary: FrameBoundary,
+              searchDirection: Int,
+              evaluator: Expression,
+              joinedRow: JoinedRow): () => Int = boundary match {
+            case UnboundedPreceding => () => 0
+            case UnboundedFollowing => () => partitionSize - 1
+            case other =>
+              () => {
+                // CurrentRow, ValuePreceding, or ValueFollowing.
+                var newPosition = rowPosition + searchDirection
+                var stopSearch = false
+                // rowOrderGenerator is a mutable projection.
+                // We need to make a copy of the returned by rowOrderGenerator 
since we will
+                // compare searched row with this currentOrderByValue.
+                val currentOrderByValue = 
rowOrderGenerator(inputRowBuffer(rowPosition)).copy()
+                while (newPosition >= 0 && newPosition < partitionSize && 
!stopSearch) {
+                  val r = rowOrderGenerator(inputRowBuffer(newPosition))
+                  stopSearch =
+                    !(evaluator.eval(joinedRow(currentOrderByValue, 
r)).asInstanceOf[Boolean])
+                  if (!stopSearch) {
+                    newPosition += searchDirection
+                  }
+                }
+                newPosition -= searchDirection
+
+                if (newPosition < 0) {
+                  0
+                } else if (newPosition >= partitionSize) {
+                  partitionSize - 1
+                } else {
+                  newPosition
+                }
+              }
+          }
+
+          windowFrame.frameType match {
+            case RowFrame =>
+              val findStart = findPhysicalBoundary(windowFrame.frameStart)
+              val findEnd = findPhysicalBoundary(windowFrame.frameEnd)
+              () => {
+                frameStart = findStart()
+                frameEnd = findEnd()
+              }
+            case RangeFrame =>
+              val joinedRowForBoundaryEvaluation: JoinedRow = new JoinedRow()
+              val orderByExpr = windowSpec.orderSpec.head
+              val currentRowExpr =
+                BoundReference(0, orderByExpr.dataType, orderByExpr.nullable)
+              val examedRowExpr =
+                BoundReference(1, orderByExpr.dataType, orderByExpr.nullable)
+              val differenceExpr = Abs(Subtract(currentRowExpr, examedRowExpr))
+
+              val frameStartEvaluator = windowFrame.frameStart match {
+                case CurrentRow => EqualTo(currentRowExpr, examedRowExpr)
+                case ValuePreceding(value) =>
+                  LessThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
+                case ValueFollowing(value) =>
+                  GreaterThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
+                case o => Literal(true) // This is just a dummy expression, we 
will not use it.
+              }
+
+              val frameEndEvaluator = windowFrame.frameEnd match {
+                case CurrentRow => EqualTo(currentRowExpr, examedRowExpr)
+                case ValuePreceding(value) =>
+                  GreaterThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
+                case ValueFollowing(value) =>
+                  LessThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
+                case o => Literal(true) // This is just a dummy expression, we 
will not use it.
+              }
+
+              val findStart =
+                findLogicalBoundary(
+                  boundary = windowFrame.frameStart,
+                  searchDirection = -1,
+                  evaluator = frameStartEvaluator,
+                  joinedRow = joinedRowForBoundaryEvaluation)
+              val findEnd =
+                findLogicalBoundary(
+                  boundary = windowFrame.frameEnd,
+                  searchDirection = 1,
+                  evaluator = frameEndEvaluator,
+                  joinedRow = joinedRowForBoundaryEvaluation)
+              () => {
+                frameStart = findStart()
+                frameEnd = findEnd()
+              }
+          }
+        }
+
+        val boundaryEvaluator = createBoundaryEvaluator()
+        // Indicates if we the specified window frame requires us to maintain 
a sliding frame
+        // (e.g. RANGES BETWEEN 1 PRECEDING AND CURRENT ROW) or the window 
frame
+        // is the entire partition (e.g. ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING).
+        val requireUpdateFrame: Boolean = {
+          def requireUpdateBoundary(boundary: FrameBoundary): Boolean = 
boundary match {
+            case UnboundedPreceding => false
+            case UnboundedFollowing => false
+            case _ => true
+          }
+
+          requireUpdateBoundary(windowFrame.frameStart) ||
+            requireUpdateBoundary(windowFrame.frameEnd)
+        }
+        // The start position of the current frame in the partition.
+        var frameStart: Int = 0
+        // The end position of the current frame in the partition.
+        var frameEnd: Int = -1
+        // Window functions.
+        val functions: Array[WindowFunction] = windowFunctions()
+        // Buffers used to store input parameters for window functions. 
Because we may need to
+        // maintain a sliding frame, we use this buffer to avoid evaluate the 
parameters from
+        // the same row multiple times.
+        val windowFunctionParameterBuffers: Array[util.LinkedList[AnyRef]] =
+          functions.map(_ => new util.LinkedList[AnyRef]())
+
+        // The projection used to generate the final result rows of this 
operator.
+        private[this] val resultProjection =
+          newMutableProjection(
+            projectList ++ windowExpressionResult,
+            projectList ++ computedSchema)()
+
+        // The row used to hold results of window functions.
+        private[this] val windowExpressionResultRow =
+          new GenericMutableRow(computedSchema.length)
+
+        private[this] val joinedRow = new JoinedRow6
+
+        // Initialize this iterator.
+        initialize()
+
+        private def initialize(): Unit = {
+          if (iter.hasNext) {
+            val currentRow = iter.next().copy()
+            // partitionGenerator is a mutable projection. Since we need to 
track nextPartitionKey,
+            // we are making a copy of the returned partitionKey at here.
+            nextPartitionKey = partitionGenerator(currentRow).copy()
+            firstRowInNextPartition = currentRow
+            fetchNextPartition()
+          } else {
+            // The iter is an empty one. So, we set all of the following 
variables
+            // to make sure hasNext will return false.
+            lastPartition = true
+            rowPosition = 0
+            partitionSize = 0
+          }
+        }
+
+        // Indicates if we will have new output row.
+        override final def hasNext: Boolean = {
+          !lastPartition || (rowPosition < partitionSize)
+        }
+
+        override final def next(): Row = {
+          if (hasNext) {
+            if (rowPosition == partitionSize) {
+              // All rows of this buffer have been consumed.
+              // We will move to next partition.
+              fetchNextPartition()
+            }
+            // Get the input row for the current output row.
+            val inputRow = inputRowBuffer(rowPosition)
+            // Get all results of the window functions for this output row.
+            var i = 0
+            while (i < functions.length) {
+              windowExpressionResultRow.update(i, 
functions(i).get(rowPosition))
+              i += 1
+            }
+
+            // Construct the output row.
+            val outputRow = resultProjection(joinedRow(inputRow, 
windowExpressionResultRow))
+            // We will move to the next one.
+            rowPosition += 1
+            if (requireUpdateFrame && rowPosition < partitionSize) {
+              // If we need to maintain a sliding frame and
+              // we will still work on this partition when next is called next 
time, do the update.
+              updateFrame()
+            }
+
+            // Return the output row.
+            outputRow
+          } else {
+            // no more result
+            throw new NoSuchElementException
+          }
+        }
+
+        // Fetch the next partition.
+        private def fetchNextPartition(): Unit = {
+          // Create a new buffer for input rows.
+          inputRowBuffer = new CompactBuffer[Row]()
+          // We already have the first row for this partition
+          // (recorded in firstRowInNextPartition). Add it back.
+          inputRowBuffer += firstRowInNextPartition
+          // Set the current partition key.
+          currentPartitionKey = nextPartitionKey
+          // Now, we will start to find all rows belonging to this partition.
+          // Create a variable to track if we see the next partition.
+          var findNextPartition = false
+          // The search will stop when we see the next partition or there is no
+          // input row left in the iter.
+          while (iter.hasNext && !findNextPartition) {
+            // Make a copy of the input row since we will put it in the buffer.
+            val currentRow = iter.next().copy()
+            // Get the partition key based on the partition specification.
+            // For the below compare method, we do not need to make a copy of 
partitionKey.
+            val partitionKey = partitionGenerator(currentRow)
+            // Check if the current row belongs the current input row.
+            val comparing = partitionOrdering.compare(currentPartitionKey, 
partitionKey)
+            if (comparing == 0) {
+              // This row is still in the current partition.
+              inputRowBuffer += currentRow
+            } else {
+              // The current input row is in a different partition.
+              findNextPartition = true
+              // partitionGenerator is a mutable projection.
+              // Since we need to track nextPartitionKey and we determine that 
it should be set
+              // as partitionKey, we are making a copy of the partitionKey at 
here.
+              nextPartitionKey = partitionKey.copy()
+              firstRowInNextPartition = currentRow
+            }
+          }
+
+          // We have not seen a new partition. It means that there is no new 
row in the
+          // iter. The current partition is the last partition of the iter.
+          if (!findNextPartition) {
+            lastPartition = true
+          }
+
+          // We have got all rows for the current partition.
+          // Set rowPosition to 0 (the next output row will be based on the 
first
+          // input row of this partition).
+          rowPosition = 0
+          // The size of this partition.
+          partitionSize = inputRowBuffer.size
+          // Reset all parameter buffers of window functions.
+          var i = 0
+          while (i < windowFunctionParameterBuffers.length) {
+            windowFunctionParameterBuffers(i).clear()
+            i += 1
+          }
+          frameStart = 0
+          frameEnd = -1
+          // Create the first window frame for this partition.
+          // If we do not need to maintain a sliding frame, this frame will
+          // have the entire partition.
+          updateFrame()
+        }
+
+        /** The function used to maintain the sliding frame. */
+        private def updateFrame(): Unit = {
+          // Based on the difference between the new frame and old frame,
+          // updates the buffers holding input parameters of window functions.
+          // We will start to prepare input parameters starting from the row
+          // indicated by offset in the input row buffer.
+          def updateWindowFunctionParameterBuffers(
+              numToRemove: Int,
+              numToAdd: Int,
+              offset: Int): Unit = {
+            // First, remove unneeded entries from the head of every buffer.
+            var i = 0
+            while (i < numToRemove) {
+              var j = 0
+              while (j < windowFunctionParameterBuffers.length) {
+                windowFunctionParameterBuffers(j).remove()
+                j += 1
+              }
+              i += 1
+            }
+            // Then, add needed entries to the tail of every buffer.
+            i = 0
+            while (i < numToAdd) {
+              var j = 0
+              while (j < windowFunctionParameterBuffers.length) {
+                // Ask the function to prepare the input parameters.
+                val parameters = 
functions(j).prepareInputParameters(inputRowBuffer(i + offset))
+                windowFunctionParameterBuffers(j).add(parameters)
+                j += 1
+              }
+              i += 1
+            }
+          }
+
+          // Record the current frame start point and end point before
+          // we update them.
+          val previousFrameStart = frameStart
+          val previousFrameEnd = frameEnd
+          boundaryEvaluator()
+          updateWindowFunctionParameterBuffers(
+            frameStart - previousFrameStart,
+            frameEnd - previousFrameEnd,
+            previousFrameEnd + 1)
+          // Evaluate the current frame.
+          evaluateCurrentFrame()
+        }
+
+        /** Evaluate the current window frame. */
+        private def evaluateCurrentFrame(): Unit = {
+          var i = 0
+          while (i < functions.length) {
+            // Reset the state of the window function.
+            functions(i).reset()
+            // Get all buffered input parameters based on rows of this window 
frame.
+            val inputParameters = windowFunctionParameterBuffers(i).toArray()
+            // Send these input parameters to the window function.
+            functions(i).batchUpdate(inputParameters)
+            // Ask the function to evaluate based on this window frame.
+            functions(i).evaluate()
+            i += 1
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 81ee48e..5e411c2 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -185,7 +185,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
     // Hive does not support buckets.
     ".*bucket.*",
 
-    // No window support yet
+    // We have our own tests based on these query files.
     ".*window.*",
 
     // Fails in hive with authorization errors.

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 1d8d0b5..f25723e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -252,6 +252,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         catalog.CreateTables ::
         catalog.PreInsertionCasts ::
         ExtractPythonUdfs ::
+        ResolveHiveWindowFunction ::
         sources.PreInsertCastAndRename ::
         Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2c47082/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 63a8c05..8a0686a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -421,16 +421,16 @@ private[hive] object HiveQl {
   }
 
   /**
-   * SELECT MAX(value) FROM src GROUP BY k1, k2, k3 GROUPING SETS((k1, k2), 
(k2)) 
-   * is equivalent to 
+   * SELECT MAX(value) FROM src GROUP BY k1, k2, k3 GROUPING SETS((k1, k2), 
(k2))
+   * is equivalent to
    * SELECT MAX(value) FROM src GROUP BY k1, k2 UNION SELECT MAX(value) FROM 
src GROUP BY k2
    * Check the following link for details.
-   * 
+   *
 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C+Grouping+and+Rollup
    *
    * The bitmask denotes the grouping expressions validity for a grouping set,
    * the bitmask also be called as grouping id (`GROUPING__ID`, the virtual 
column in Hive)
-   * e.g. In superset (k1, k2, k3), (bit 0: k1, bit 1: k2, and bit 2: k3), the 
grouping id of 
+   * e.g. In superset (k1, k2, k3), (bit 0: k1, bit 1: k2, and bit 2: k3), the 
grouping id of
    * GROUPING SETS (k1, k2) and (k2) should be 3 and 2 respectively.
    */
   protected def extractGroupingSet(children: Seq[ASTNode]): (Seq[Expression], 
Seq[Int]) = {
@@ -444,7 +444,7 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
 
     val bitmasks: Seq[Int] = setASTs.map(set => set match {
       case Token("TOK_GROUPING_SETS_EXPRESSION", null) => 0
-      case Token("TOK_GROUPING_SETS_EXPRESSION", children) => 
+      case Token("TOK_GROUPING_SETS_EXPRESSION", children) =>
         children.foldLeft(0)((bitmap, col) => {
           val colString = col.asInstanceOf[ASTNode].toStringTree()
           require(keyMap.contains(colString), s"$colString doens't show up in 
the GROUP BY list")
@@ -613,7 +613,8 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
             clusterByClause ::
             distributeByClause ::
             limitClause ::
-            lateralViewClause :: Nil) = {
+            lateralViewClause ::
+            windowClause :: Nil) = {
           getClauses(
             Seq(
               "TOK_INSERT_INTO",
@@ -631,15 +632,16 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
               "TOK_CLUSTERBY",
               "TOK_DISTRIBUTEBY",
               "TOK_LIMIT",
-              "TOK_LATERAL_VIEW"),
+              "TOK_LATERAL_VIEW",
+              "WINDOW"),
             singleInsert)
         }
- 
+
         val relations = fromClause match {
           case Some(f) => nodeToRelation(f)
           case None => OneRowRelation
         }
- 
+
         val withWhere = whereClause.map { whereNode =>
           val Seq(whereExpr) = whereNode.getChildren.toSeq
           Filter(nodeToExpr(whereExpr), relations)
@@ -691,7 +693,7 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
                 val serdeProps = propsClause.map {
                   case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: 
Token(value, Nil) :: Nil) =>
                     (name, value)
-                } 
+                }
                 (Nil, serdeClass, serdeProps)
 
               case Nil => (Nil, "", Nil)
@@ -736,7 +738,7 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
         // The projection of the query can either be a normal projection, an 
aggregation
         // (if there is a group by) or a script transformation.
         val withProject: LogicalPlan = transformation.getOrElse {
-          val selectExpressions = 
+          val selectExpressions =
             
nameExpressions(select.getChildren.flatMap(selExprNodeToExpr).toSeq)
           Seq(
             groupByClause.map(e => e match {
@@ -764,31 +766,34 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
             Some(Project(selectExpressions, withLateralView))).flatten.head
         }
 
-        val withDistinct =
-          if (selectDistinctClause.isDefined) Distinct(withProject) else 
withProject
-
+        // Handle HAVING clause.
         val withHaving = havingClause.map { h =>
           val havingExpr = h.getChildren.toSeq match { case Seq(hexpr) => 
nodeToExpr(hexpr) }
           // Note that we added a cast to boolean. If the expression itself is 
already boolean,
           // the optimizer will get rid of the unnecessary cast.
-          Filter(Cast(havingExpr, BooleanType), withDistinct)
-        }.getOrElse(withDistinct)
+          Filter(Cast(havingExpr, BooleanType), withProject)
+        }.getOrElse(withProject)
+
+        // Handle SELECT DISTINCT
+        val withDistinct =
+          if (selectDistinctClause.isDefined) Distinct(withHaving) else 
withHaving
 
+        // Handle ORDER BY, SORT BY, DISTRIBETU BY, and CLUSTER BY clause.
         val withSort =
           (orderByClause, sortByClause, distributeByClause, clusterByClause) 
match {
             case (Some(totalOrdering), None, None, None) =>
-              Sort(totalOrdering.getChildren.map(nodeToSortOrder), true, 
withHaving)
+              Sort(totalOrdering.getChildren.map(nodeToSortOrder), true, 
withDistinct)
             case (None, Some(perPartitionOrdering), None, None) =>
-              Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), 
false, withHaving)
+              Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), 
false, withDistinct)
             case (None, None, Some(partitionExprs), None) =>
-              
RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving)
+              
RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), 
withDistinct)
             case (None, Some(perPartitionOrdering), Some(partitionExprs), 
None) =>
               Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), 
false,
-                
RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving))
+                
RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), 
withDistinct))
             case (None, None, None, Some(clusterExprs)) =>
               Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, 
Ascending)), false,
-                
RepartitionByExpression(clusterExprs.getChildren.map(nodeToExpr), withHaving))
-            case (None, None, None, None) => withHaving
+                
RepartitionByExpression(clusterExprs.getChildren.map(nodeToExpr), withDistinct))
+            case (None, None, None, None) => withDistinct
             case _ => sys.error("Unsupported set of ordering / distribution 
clauses.")
           }
 
@@ -797,6 +802,27 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
             .map(Limit(_, withSort))
             .getOrElse(withSort)
 
+        // Collect all window specifications defined in the WINDOW clause.
+        val windowDefinitions = windowClause.map(_.getChildren.toSeq.collect {
+          case Token("TOK_WINDOWDEF",
+          Token(windowName, Nil) :: Token("TOK_WINDOWSPEC", spec) :: Nil) =>
+            windowName -> nodesToWindowSpecification(spec)
+        }.toMap)
+        // Handle cases like
+        // window w1 as (partition by p_mfgr order by p_name
+        //               range between 2 preceding and 2 following),
+        //        w2 as w1
+        val resolvedCrossReference = windowDefinitions.map {
+          windowDefMap => windowDefMap.map {
+            case (windowName, WindowSpecReference(other)) =>
+              (windowName, 
windowDefMap(other).asInstanceOf[WindowSpecDefinition])
+            case o => o.asInstanceOf[(String, WindowSpecDefinition)]
+          }
+        }
+
+        val withWindowDefinitions =
+          resolvedCrossReference.map(WithWindowDefinition(_, 
withLimit)).getOrElse(withLimit)
+
         // TOK_INSERT_INTO means to add files to the table.
         // TOK_DESTINATION means to overwrite the table.
         val resultDestination =
@@ -804,7 +830,7 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
         val overwrite = intoClause.isEmpty
         nodeToDest(
           resultDestination,
-          withLimit,
+          withWindowDefinitions,
           overwrite)
       }
 
@@ -1053,7 +1079,6 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       throw new NotImplementedError(s"No parse rules for:\n 
${dumpTree(a).toString} ")
   }
 
-
   protected val escapedIdentifier = "`([^`]+)`".r
   /** Strips backticks from ident if present */
   protected def cleanIdentifier(ident: String): String = ident match {
@@ -1250,6 +1275,25 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length))
     case Token("TOK_FUNCTION", Token(COALESCE(), Nil) :: list) => 
Coalesce(list.map(nodeToExpr))
 
+    /* Window Functions */
+    case Token("TOK_FUNCTION", Token(name, Nil) +: args :+ 
Token("TOK_WINDOWSPEC", spec)) =>
+      val function = UnresolvedWindowFunction(name, args.map(nodeToExpr))
+      nodesToWindowSpecification(spec) match {
+        case reference: WindowSpecReference =>
+          UnresolvedWindowExpression(function, reference)
+        case definition: WindowSpecDefinition =>
+          WindowExpression(function, definition)
+      }
+    case Token("TOK_FUNCTIONSTAR", Token(name, Nil) :: Token("TOK_WINDOWSPEC", 
spec) :: Nil) =>
+      // Safe to use Literal(1)?
+      val function = UnresolvedWindowFunction(name, Literal(1) :: Nil)
+      nodesToWindowSpecification(spec) match {
+        case reference: WindowSpecReference =>
+          UnresolvedWindowExpression(function, reference)
+        case definition: WindowSpecDefinition =>
+          WindowExpression(function, definition)
+      }
+
     /* UDFs - Must be last otherwise will preempt built in functions */
     case Token("TOK_FUNCTION", Token(name, Nil) :: args) =>
       UnresolvedFunction(name, args.map(nodeToExpr))
@@ -1312,6 +1356,89 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
          """.stripMargin)
   }
 
+  def nodesToWindowSpecification(nodes: Seq[ASTNode]): WindowSpec = nodes 
match {
+    case Token(windowName, Nil) :: Nil =>
+      // Refer to a window spec defined in the window clause.
+      WindowSpecReference(windowName)
+    case Nil =>
+      // OVER()
+      WindowSpecDefinition(
+        partitionSpec = Nil,
+        orderSpec = Nil,
+        frameSpecification = UnspecifiedFrame)
+    case spec =>
+      val (partitionClause :: rowFrame :: rangeFrame :: Nil) =
+        getClauses(
+          Seq(
+            "TOK_PARTITIONINGSPEC",
+            "TOK_WINDOWRANGE",
+            "TOK_WINDOWVALUES"),
+          spec)
+
+      // Handle Partition By and Order By.
+      val (partitionSpec, orderSpec) = partitionClause.map { 
partitionAndOrdering =>
+        val (partitionByClause :: orderByClause :: sortByClause :: 
clusterByClause :: Nil) =
+          getClauses(
+            Seq("TOK_DISTRIBUTEBY", "TOK_ORDERBY", "TOK_SORTBY", 
"TOK_CLUSTERBY"),
+            partitionAndOrdering.getChildren.toSeq.asInstanceOf[Seq[ASTNode]])
+
+        (partitionByClause, orderByClause.orElse(sortByClause), 
clusterByClause) match {
+          case (Some(partitionByExpr), Some(orderByExpr), None) =>
+            (partitionByExpr.getChildren.map(nodeToExpr),
+              orderByExpr.getChildren.map(nodeToSortOrder))
+          case (Some(partitionByExpr), None, None) =>
+            (partitionByExpr.getChildren.map(nodeToExpr), Nil)
+          case (None, Some(orderByExpr), None) =>
+            (Nil, orderByExpr.getChildren.map(nodeToSortOrder))
+          case (None, None, Some(clusterByExpr)) =>
+            val expressions = clusterByExpr.getChildren.map(nodeToExpr)
+            (expressions, expressions.map(SortOrder(_, Ascending)))
+          case _ =>
+            throw new NotImplementedError(
+              s"""No parse rules for Node ${partitionAndOrdering.getName}
+              """.stripMargin)
+        }
+      }.getOrElse {
+        (Nil, Nil)
+      }
+
+      // Handle Window Frame
+      val windowFrame =
+        if (rowFrame.isEmpty && rangeFrame.isEmpty) {
+          UnspecifiedFrame
+        } else {
+          val frameType = rowFrame.map(_ => RowFrame).getOrElse(RangeFrame)
+          def nodeToBoundary(node: Node): FrameBoundary = node match {
+            case Token("preceding", Token(count, Nil) :: Nil) =>
+              if (count == "unbounded") UnboundedPreceding else 
ValuePreceding(count.toInt)
+            case Token("following", Token(count, Nil) :: Nil) =>
+              if (count == "unbounded") UnboundedFollowing else 
ValueFollowing(count.toInt)
+            case Token("current", Nil) => CurrentRow
+            case _ =>
+              throw new NotImplementedError(
+                s"""No parse rules for the Window Frame Boundary based on Node 
${node.getName}
+              """.stripMargin)
+          }
+
+          rowFrame.orElse(rangeFrame).map { frame =>
+            frame.getChildren.toList match {
+              case precedingNode :: followingNode :: Nil =>
+                SpecifiedWindowFrame(
+                  frameType,
+                  nodeToBoundary(precedingNode),
+                  nodeToBoundary(followingNode))
+              case precedingNode :: Nil =>
+                SpecifiedWindowFrame(frameType, nodeToBoundary(precedingNode), 
CurrentRow)
+              case _ =>
+                throw new NotImplementedError(
+                  s"""No parse rules for the Window Frame based on Node 
${frame.getName}
+                  """.stripMargin)
+            }
+          }.getOrElse(sys.error(s"If you see this, please file a bug report 
with your query."))
+        }
+
+      WindowSpecDefinition(partitionSpec, orderSpec, windowFrame)
+  }
 
   val explode = "(?i)explode".r
   def nodesToGenerator(nodes: Seq[Node]): (Generator, Seq[String]) = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to