Repository: spark
Updated Branches:
  refs/heads/master 0a1d2ca42 -> 39ab199a3


[SPARK-8640] [SQL] Enable Processing of Multiple Window Frames in a Single 
Window Operator

This PR enables the processing of multiple window frames in a single window 
operator. This should improve the performance of processing multiple window 
expressions wich share partition by/order by clauses, because it will be more 
efficient with respect to memory use and group processing.

Author: Herman van Hovell <hvanhov...@questtec.nl>

Closes #7515 from hvanhovell/SPARK-8640 and squashes the following commits:

f0e1c21 [Herman van Hovell] Changed Window Logical/Physical plans to use 
partition by/order by specs directly instead of using WindowSpec.
e1711c2 [Herman van Hovell] Enabled the processing of multiple window frames in 
a single Window operator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39ab199a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39ab199a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39ab199a

Branch: refs/heads/master
Commit: 39ab199a3f735b7658ab3331d3e2fb03441aec13
Parents: 0a1d2ca
Author: Herman van Hovell <hvanhov...@questtec.nl>
Authored: Fri Jul 31 12:07:18 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Jul 31 12:08:25 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala   | 12 +++++++-----
 .../catalyst/plans/logical/basicOperators.scala  |  3 ++-
 .../spark/sql/execution/SparkStrategies.scala    |  5 +++--
 .../org/apache/spark/sql/execution/Window.scala  | 19 ++++++++++---------
 .../spark/sql/hive/execution/HivePlanTest.scala  | 18 ++++++++++++++++++
 5 files changed, 40 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39ab199a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 265f3d1..51d910b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -347,7 +347,7 @@ class Analyzer(
             val newOutput = oldVersion.generatorOutput.map(_.newInstance())
             (oldVersion, oldVersion.copy(generatorOutput = newOutput))
 
-          case oldVersion @ Window(_, windowExpressions, _, child)
+          case oldVersion @ Window(_, windowExpressions, _, _, child)
               if 
AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
                 .nonEmpty =>
             (oldVersion, oldVersion.copy(windowExpressions = 
newAliases(windowExpressions)))
@@ -825,7 +825,7 @@ class Analyzer(
         }.asInstanceOf[NamedExpression]
       }
 
-      // Second, we group extractedWindowExprBuffer based on their Window Spec.
+      // Second, we group extractedWindowExprBuffer based on their Partition 
and Order Specs.
       val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr 
=>
         val distinctWindowSpec = expr.collect {
           case window: WindowExpression => window.windowSpec
@@ -841,7 +841,8 @@ class Analyzer(
           failAnalysis(s"$expr has multiple Window Specifications 
($distinctWindowSpec)." +
             s"Please file a bug report with this error message, stack trace, 
and the query.")
         } else {
-          distinctWindowSpec.head
+          val spec = distinctWindowSpec.head
+          (spec.partitionSpec, spec.orderSpec)
         }
       }.toSeq
 
@@ -850,9 +851,10 @@ class Analyzer(
       var currentChild = child
       var i = 0
       while (i < groupedWindowExpressions.size) {
-        val (windowSpec, windowExpressions) = groupedWindowExpressions(i)
+        val ((partitionSpec, orderSpec), windowExpressions) = 
groupedWindowExpressions(i)
         // Set currentChild to the newly created Window operator.
-        currentChild = Window(currentChild.output, windowExpressions, 
windowSpec, currentChild)
+        currentChild = Window(currentChild.output, windowExpressions,
+          partitionSpec, orderSpec, currentChild)
 
         // Move to next Window Spec.
         i += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/39ab199a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index a67f8de..aacfc86 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -228,7 +228,8 @@ case class Aggregate(
 case class Window(
     projectList: Seq[Attribute],
     windowExpressions: Seq[NamedExpression],
-    windowSpec: WindowSpecDefinition,
+    partitionSpec: Seq[Expression],
+    orderSpec: Seq[SortOrder],
     child: LogicalPlan) extends UnaryNode {
 
   override def output: Seq[Attribute] =

http://git-wip-us.apache.org/repos/asf/spark/blob/39ab199a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 03d24a8..4aff52d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -389,8 +389,9 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           execution.Aggregate(partial = false, group, agg, planLater(child)) 
:: Nil
         }
       }
-      case logical.Window(projectList, windowExpressions, spec, child) =>
-        execution.Window(projectList, windowExpressions, spec, 
planLater(child)) :: Nil
+      case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, 
child) =>
+        execution.Window(
+          projectList, windowExprs, partitionSpec, orderSpec, 
planLater(child)) :: Nil
       case logical.Sample(lb, ub, withReplacement, seed, child) =>
         execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: 
Nil
       case logical.LocalRelation(output, data) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/39ab199a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 91c8a02..fe9f2c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -80,23 +80,24 @@ import scala.collection.mutable
 case class Window(
     projectList: Seq[Attribute],
     windowExpression: Seq[NamedExpression],
-    windowSpec: WindowSpecDefinition,
+    partitionSpec: Seq[Expression],
+    orderSpec: Seq[SortOrder],
     child: SparkPlan)
   extends UnaryNode {
 
   override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
   override def requiredChildDistribution: Seq[Distribution] = {
-    if (windowSpec.partitionSpec.isEmpty) {
+    if (partitionSpec.isEmpty) {
       // Only show warning when the number of bytes is larger than 100 MB?
       logWarning("No Partition Defined for Window operation! Moving all data 
to a single "
         + "partition, this can cause serious performance degradation.")
       AllTuples :: Nil
-    } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
+    } else ClusteredDistribution(partitionSpec) :: Nil
   }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
-    Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+    Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
@@ -115,12 +116,12 @@ case class Window(
       case RangeFrame =>
         val (exprs, current, bound) = if (offset == 0) {
           // Use the entire order expression when the offset is 0.
-          val exprs = windowSpec.orderSpec.map(_.child)
+          val exprs = orderSpec.map(_.child)
           val projection = newMutableProjection(exprs, child.output)
-          (windowSpec.orderSpec, projection(), projection())
-        } else if (windowSpec.orderSpec.size == 1) {
+          (orderSpec, projection(), projection())
+        } else if (orderSpec.size == 1) {
           // Use only the first order expression when the offset is non-null.
-          val sortExpr = windowSpec.orderSpec.head
+          val sortExpr = orderSpec.head
           val expr = sortExpr.child
           // Create the projection which returns the current 'value'.
           val current = newMutableProjection(expr :: Nil, child.output)()
@@ -250,7 +251,7 @@ case class Window(
 
         // Get all relevant projections.
         val result = createResultProjection(unboundExpressions)
-        val grouping = newProjection(windowSpec.partitionSpec, child.output)
+        val grouping = newProjection(partitionSpec, child.output)
 
         // Manage the stream and the grouping.
         var nextRow: InternalRow = EmptyRow

http://git-wip-us.apache.org/repos/asf/spark/blob/39ab199a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
index bdb53dd..ba56a8a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.hive.test.TestHive
 
 class HivePlanTest extends QueryTest {
@@ -31,4 +34,19 @@ class HivePlanTest extends QueryTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("window expressions sharing the same partition by and order by clause") 
{
+    val df = Seq.empty[(Int, String, Int, Int)].toDF("id", "grp", "seq", "val")
+    val window = Window.
+      partitionBy($"grp").
+      orderBy($"val")
+    val query = df.select(
+      $"id",
+      sum($"val").over(window.rowsBetween(-1, 1)),
+      sum($"val").over(window.rangeBetween(-1, 1))
+    )
+    val plan = query.queryExecution.analyzed
+    assert(plan.collect{ case w: logical.Window => w }.size === 1,
+      "Should have only 1 Window operator.")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to