[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/7513


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122717523
  
I am merging it to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122716747
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122716729
  
  [Test build #37782 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37782/console)
 for   PR 7513 at commit 
[`4e69d08`](https://github.com/apache/spark/commit/4e69d08ab0292220a3cf67d3011947f80f931ed7).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class DayOfYear(child: Expression) extends UnaryExpression with 
ImplicitCastInputTypes `
  * `case class DayOfMonth(child: Expression) extends UnaryExpression with 
ImplicitCastInputTypes `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122709323
  
LGTM. Will merge it to master once it passes jenkins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122708899
  
  [Test build #37782 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37782/consoleFull)
 for   PR 7513 at commit 
[`4e69d08`](https://github.com/apache/spark/commit/4e69d08ab0292220a3cf67d3011947f80f931ed7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122708823
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122708829
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122708770
  
ah, I guess it is because I replied through email. Let's see if jenkins 
gets it this time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122708749
  
ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122707719
  
@yhuai Don't think jenkins picked up your OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122689642
  
ok to test






On Sun, Jul 19, 2015 at 9:02 AM -0700, "UCB AMPLab" 
 wrote:












Can one of the admins verify this patch?



—
Reply to this email directly or view it on GitHub.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122674562
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122674518
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7513#issuecomment-122674419
  
cc @yhuai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-19 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/7513

[SPARK-8638] [SQL] Window Function Performance Improvements - Cleanup

This PR contains a few clean-ups that are a part of SPARK-8638: a few style 
issues got fixed, and a few tests were moved.

Git commit message is wrong BTW :(...

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-8638-cleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7513.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7513


commit 4e69d08ab0292220a3cf67d3011947f80f931ed7
Author: Herman van Hovell 
Date:   2015-07-19T15:52:28Z

Fixed Perfomance Regression for Shrinking Window Frames (+Rebase)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r3498
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +84,667 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frameType to evaluate. This can either be Row or Range based.
+   * @param offset with respect to the row.
+   * @return a bound ordering object.
+   */
+  private[this] def createBoundOrdering(frameType: FrameType, offset: 
Int): BoundOrdering = {
+frameType match {
+  case RangeFrame =>
+val (exprs, current, bound) = if (offset == 0) {
+  // Use the entire order expression when the offset is 0.
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+else if (windowSpec.orderSpec.size == 1) {
+  // Use only the first order expression when the offset is 
non-null.
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  // Create the projection which returns the current 'value'.
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  // Flip the sign of the offset when processing the order is 
descending
+  val boundOffset = if (sortExpr.direction == Descending) -offset
+  else offset
+  // Create the projection which returns the current 'value' 
modified by adding the offset.
+  val boundExpr = Add(expr, Cast(Literal.create(boundOffset, 
IntegerType), expr.dataType

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r3496
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +84,667 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frameType to evaluate. This can either be Row or Range based.
+   * @param offset with respect to the row.
+   * @return a bound ordering object.
+   */
+  private[this] def createBoundOrdering(frameType: FrameType, offset: 
Int): BoundOrdering = {
+frameType match {
+  case RangeFrame =>
+val (exprs, current, bound) = if (offset == 0) {
+  // Use the entire order expression when the offset is 0.
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+else if (windowSpec.orderSpec.size == 1) {
+  // Use only the first order expression when the offset is 
non-null.
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  // Create the projection which returns the current 'value'.
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  // Flip the sign of the offset when processing the order is 
descending
+  val boundOffset = if (sortExpr.direction == Descending) -offset
+  else offset
--- End diff --

```
val boundOffset =
  if (sortExpr.direction == Descending)
-offset
  else
offset
```


---
If your project is set up for it, you can reply to th

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r3497
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +84,667 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frameType to evaluate. This can either be Row or Range based.
+   * @param offset with respect to the row.
+   * @return a bound ordering object.
+   */
+  private[this] def createBoundOrdering(frameType: FrameType, offset: 
Int): BoundOrdering = {
+frameType match {
+  case RangeFrame =>
+val (exprs, current, bound) = if (offset == 0) {
+  // Use the entire order expression when the offset is 0.
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+else if (windowSpec.orderSpec.size == 1) {
--- End diff --

`} else if`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34955549
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala 
---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{Row, QueryTest}
+import org.apache.spark.sql.expressions.Window
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
+
+/**
+ * Window expressions are tested extensively by the following test suites:
+ * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]]
+ * 
[[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]]
+ * 
[[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]]
+ * However these suites do not cover all possible (i.e. more exotic) 
settings. This suite fill
+ * this gap.
+ *
+ * TODO Move this class to the sql/core project when we move to Native 
Spark UDAFs.
+ */
+class WindowSuite extends QueryTest {
--- End diff --

Seems we do not need to create a new suite, right? We can just use 
`HiveDataFrameWindowSuite`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/7057


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122633626
  
@hvanhovell Overall looks good. I am merging it to master. I will leave a 
few comments for minor changes. Can you submit a follow-up PR to address them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122611251
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122611245
  
  [Test build #37743 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37743/console)
 for   PR 7057 at commit 
[`3bfdc49`](https://github.com/apache/spark/commit/3bfdc491a3c1cad3cf07585388cc448b04bacd98).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122607711
  
  [Test build #37743 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37743/consoleFull)
 for   PR 7057 at commit 
[`3bfdc49`](https://github.com/apache/spark/commit/3bfdc491a3c1cad3cf07585388cc448b04bacd98).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122607604
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122607610
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122161494
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122161458
  
  [Test build #37567 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37567/console)
 for   PR 7057 at commit 
[`7207ef5`](https://github.com/apache/spark/commit/7207ef58ded5feb9fcd8651650153e1e82416971).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122158569
  
@yhuai the benchmarking results are attached. It might be interesting to 
see how the operator performs on different datasets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122151112
  
  [Test build #37567 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37567/consoleFull)
 for   PR 7057 at commit 
[`7207ef5`](https://github.com/apache/spark/commit/7207ef58ded5feb9fcd8651650153e1e82416971).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34857571
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala 
---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{Row, QueryTest}
+import org.apache.spark.sql.expressions.Window
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
+
+/**
+ * Window expressions are tested extensively by the following test suites:
+ * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]]
+ * 
[[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]]
+ * 
[[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]]
+ * However these suites do not cover all possible (i.e. more exotic) 
settings. This suite fill
+ * this gap.
+ *
+ * TODO Move this class to the sql/core project when we move to Native 
Spark UDAFs.
+ */
+class WindowSuite extends QueryTest {
+
+  test("reverse sliding range frame") {
+val df = Seq(
+  (1, "Thin", "Cell Phone", 6000),
+  (2, "Normal", "Tablet", 1500),
+  (3, "Mini", "Tablet", 5500),
+  (4, "Ultra thin", "Cell Phone", 5500),
+  (5, "Very thin", "Cell Phone", 6000),
+  (6, "Big", "Tablet", 2500),
+  (7, "Bendable", "Cell Phone", 3000),
+  (8, "Foldable", "Cell Phone", 3000),
+  (9, "Pro", "Tablet", 4500),
+  (10, "Pro2", "Tablet", 6500)).
+  toDF("id", "product", "category", "revenue")
+checkAnswer(
+  df.select(
+$"id",
+avg($"revenue").over(Window.
+  partitionBy($"category").
+  orderBy($"revenue".desc).
+  rangeBetween(-2000L, 1000L)).
+  cast("int")),
+Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
+  Row(4, 5833) :: Row(5, 5833) :: Row(6, 2000) ::
+  Row(7, 3000) :: Row(8, 3000) :: Row(9, 4166) ::
+  Row(10, 5500) :: Nil)
--- End diff --

Yeah that was a mistake on my behalf. I didn't realise that offsets should 
be flipped on when the order is descending (I swapped the high and low offsets).

I pushed a fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122150655
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122150637
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122132838
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122132788
  
  [Test build #37545 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37545/console)
 for   PR 7057 at commit 
[`c3e4287`](https://github.com/apache/spark/commit/c3e428719140a21a84ad88294b94778d48f7d768).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122130906
  
@hvanhovell I remember you have some benchmarking results. Can you add 
results to the description? Also, does your benchmark include tests for all of 
four kinds of frames (entire partition, growing frame, shrinking frame, and 
moving frame)? It will be good if we can have results for all these kinds of 
frames and we make sure there is no performance regression (I think it is 
unlikely that we introduce regression. But, it still good to have benchmarking 
results for different kinds of cases).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34848738
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala 
---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{Row, QueryTest}
+import org.apache.spark.sql.expressions.Window
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
+
+/**
+ * Window expressions are tested extensively by the following test suites:
+ * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]]
+ * 
[[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]]
+ * 
[[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]]
+ * However these suites do not cover all possible (i.e. more exotic) 
settings. This suite fill
+ * this gap.
+ *
+ * TODO Move this class to the sql/core project when we move to Native 
Spark UDAFs.
+ */
+class WindowSuite extends QueryTest {
+
+  test("reverse sliding range frame") {
+val df = Seq(
+  (1, "Thin", "Cell Phone", 6000),
+  (2, "Normal", "Tablet", 1500),
+  (3, "Mini", "Tablet", 5500),
+  (4, "Ultra thin", "Cell Phone", 5500),
+  (5, "Very thin", "Cell Phone", 6000),
+  (6, "Big", "Tablet", 2500),
+  (7, "Bendable", "Cell Phone", 3000),
+  (8, "Foldable", "Cell Phone", 3000),
+  (9, "Pro", "Tablet", 4500),
+  (10, "Pro2", "Tablet", 6500)).
+  toDF("id", "product", "category", "revenue")
+checkAnswer(
+  df.select(
+$"id",
+avg($"revenue").over(Window.
+  partitionBy($"category").
+  orderBy($"revenue".desc).
+  rangeBetween(-2000L, 1000L)).
+  cast("int")),
+Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
+  Row(4, 5833) :: Row(5, 5833) :: Row(6, 2000) ::
+  Row(7, 3000) :: Row(8, 3000) :: Row(9, 4166) ::
+  Row(10, 5500) :: Nil)
--- End diff --

Actually, this result seems not correct. For example, when we process `(10, 
"Pro2", "Tablet", 6500)`. The range of revenue for the frame will be [5500, 
8500]. Since the ordering direction is `desc`, `2000 preceding` means `8500` 
and `1000 following` means `5500`. So, the avg value will be `6000`.

The result I got with Spark 1.4 and Hive is.
```
+--+--+--+---++
|id|   product|  category|revenue| avg|
+--+--+--+---++
|10|  Pro2|Tablet|   6500|6000|
| 3|  Mini|Tablet|   5500|5500|
| 9|   Pro|Tablet|   4500|5500|
| 6|   Big|Tablet|   2500|2833|
| 2|Normal|Tablet|   1500|2000|
| 1|  Thin|Cell Phone|   6000|5833|
| 5| Very thin|Cell Phone|   6000|5833|
| 4|Ultra thin|Cell Phone|   5500|5833|
| 7|  Bendable|Cell Phone|   3000|3000|
| 8|  Foldable|Cell Phone|   3000|3000|
+--+--+--+---++
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34847515
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala 
---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{Row, QueryTest}
+import org.apache.spark.sql.expressions.Window
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
+
+/**
+ * Window expressions are tested extensively by the following test suites:
+ * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]]
+ * 
[[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]]
+ * 
[[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]]
+ * However these suites do not cover all possible (i.e. more exotic) 
settings. This suite fill
+ * this gap.
+ *
+ * TODO Move this class to the sql/core project when we move to Native 
Spark UDAFs.
+ */
+class WindowSuite extends QueryTest {
+
+  test("reverse sliding range frame") {
+val df = Seq(
+  (1, "Thin", "Cell Phone", 6000),
+  (2, "Normal", "Tablet", 1500),
+  (3, "Mini", "Tablet", 5500),
+  (4, "Ultra thin", "Cell Phone", 5500),
+  (5, "Very thin", "Cell Phone", 6000),
+  (6, "Big", "Tablet", 2500),
+  (7, "Bendable", "Cell Phone", 3000),
+  (8, "Foldable", "Cell Phone", 3000),
+  (9, "Pro", "Tablet", 4500),
+  (10, "Pro2", "Tablet", 6500)).
+  toDF("id", "product", "category", "revenue")
+checkAnswer(
+  df.select(
+$"id",
+avg($"revenue").over(Window.
+  partitionBy($"category").
+  orderBy($"revenue".desc).
+  rangeBetween(-2000L, 1000L)).
+  cast("int")),
--- End diff --

How about we format it like 
```
val window =
  Window
.partitionBy($"category")
.orderBy($"revenue".desc)
.rangeBetween(-2000L, 1000L))
checkAnswer(
  df.select($"id", avg($"revenue").over(window).cast("int")),
  ...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34847128
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +84,661 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frameType to evaluate. This can either be Row or Range based.
+   * @param offset with respect to the row.
+   * @return a bound ordering object.
+   */
+  private[this] def createBoundOrdering(frameType: FrameType, offset: 
Int): BoundOrdering = {
+frameType match {
+  case RangeFrame =>
+val (exprs, current, bound) = if (offset == 0) {
+  // Use the entire order expression when the offset is 0.
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+else if (windowSpec.orderSpec.size == 1) {
+  // Use only the first order expression when the offset is 
non-null.
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  // Create the projection which returns the current 'value'.
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  // Create the projection which returns the current 'value' 
modified by adding the offset.
+  val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
+  val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
+  (sortExpr :: Nil, current, bound)
+}
+else {
+  sys.error("No

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122120479
  
  [Test build #37545 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37545/consoleFull)
 for   PR 7057 at commit 
[`c3e4287`](https://github.com/apache/spark/commit/c3e428719140a21a84ad88294b94778d48f7d768).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34844502
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i < computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDE

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34844383
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +84,661 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frameType to evaluate. This can either be Row or Range based.
+   * @param offset with respect to the row.
+   * @return a bound ordering object.
+   */
+  private[this] def createBoundOrdering(frameType: FrameType, offset: 
Int): BoundOrdering = {
+frameType match {
+  case RangeFrame =>
+val (exprs, current, bound) = if (offset == 0) {
+  // Use the entire order expression when the offset is 0.
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+else if (windowSpec.orderSpec.size == 1) {
+  // Use only the first order expression when the offset is 
non-null.
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  // Create the projection which returns the current 'value'.
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  // Create the projection which returns the current 'value' 
modified by adding the offset.
+  val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
+  val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
+  (sortExpr :: Nil, current, bound)
+}
+else {
+  sys.erro

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122119926
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-122119941
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34618909
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i < computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDE

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34618831
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i < computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDER BY 

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34617761
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i < computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDE

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34616373
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i < computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDER BY 

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34616177
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i < computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDE

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34613697
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i < computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDER BY 

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34613255
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i < computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDER BY 

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34612767
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i < computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDER BY 

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34612470
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -38,443 +68,645 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
-}
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  // Create window functions.
-  private[this] def windowFunctions(): Array[WindowFunction] = {
-val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
-var i = 0
-while (i < computedWindowExpressions.length) {
-  functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
-  functions(i).init()
-  i += 1
+  /**
+   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
+   * used to determine which input row lies within the frame boundaries of 
an output row. There
+   * are two types of boundaries that can be evaluated:
+   * - Row Based: A row based boundary is based on the position of the row 
within the partition.
+   *   The offset indicates the number of rows above or below the current 
row, the frame for the
+   *   current row starts or ends. For instance, given a row based sliding 
frame with a lower bound
+   *   offset of -1 and a upper bound offset of +2. The frame for row with 
index 5 would range from
+   *   index 4 to index 6.
+   * - Range based: A range based boundary is based on the actual value of 
the ORDER BY
+   *   expression(s). The offset is used to alter the value of the ORDER 
BY expression, for
+   *   instance if the current order by expression has a value of 10 and 
the lower bound offset
+   *   is -3, the resulting lower bound for the current row will be 10 - 3 
= 7. This however puts a
+   *   number of constraints on the ORDER BY 

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-121348737
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-121348609
  
  [Test build #37244 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37244/console)
 for   PR 7057 at commit 
[`480bb05`](https://github.com/apache/spark/commit/480bb0521be005f937e2098333c4ac42ff65bd71).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34604350
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

ah, I see. Sorry for missing it. No, we do not need to add this to the 
Window operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34603151
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

Tiny follow-up. Now I know why I have removed it: 
```org.apache.spark.sql.execution.UnaryNode```, the parent class of 
```Window```, already implements this in exactly the same way:
```
private[sql] trait UnaryNode extends SparkPlan with 
trees.UnaryNode[SparkPlan] {
  self: Product =>
  override def outputPartitioning: Partitioning = child.outputPartitioning
}
```
So do I still need to add this to the ```Window``` class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34602077
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

Ahhh... Now I understand. I got confused with ```outputOrdering```, sorry 
about that. I'll add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34599617
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

Seems we are still missing `override def outputPartitioning: Partitioning = 
child.outputPartitioning`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34599594
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
 ---
@@ -189,7 +189,7 @@ class HiveDataFrameWindowSuite extends QueryTest {
   df.select(
 $"key",
 last("value").over(
-  Window.partitionBy($"value").orderBy($"key").rangeBetween(1, 
Long.MaxValue))
+  Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 0))
--- End diff --

I think the test dataset is not a useful one. How about we change it
`val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, 
"2")).toDF("key", "value1")` and for the last_value's frame, we use 
`rangeBetween(-2, -1)`? Although `last_value` is not a deterministic function 
in general, if we apply it on the order by expression, the result will be 
deterministic. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-121323095
  
  [Test build #37244 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37244/consoleFull)
 for   PR 7057 at commit 
[`480bb05`](https://github.com/apache/spark/commit/480bb0521be005f937e2098333c4ac42ff65bd71).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-121322206
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-121322231
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-14 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-121321874
  
ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-121071373
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-119763356
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-119763236
  
  [Test build #36850 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36850/console)
 for   PR 7057 at commit 
[`480bb05`](https://github.com/apache/spark/commit/480bb0521be005f937e2098333c4ac42ff65bd71).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread hvanhovell
Github user hvanhovell commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-119752352
  
@yhuai I have updated the PR. 

As for the documentation. I will add another section to the general class 
documentation, which explains the inner workings of the operator. Let me know 
what else needs some more documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-119750074
  
  [Test build #36850 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36850/consoleFull)
 for   PR 7057 at commit 
[`480bb05`](https://github.com/apache/spark/commit/480bb0521be005f937e2098333c4ac42ff65bd71).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-119749448
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-119749475
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34205943
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
 ---
@@ -189,7 +189,7 @@ class HiveDataFrameWindowSuite extends QueryTest {
   df.select(
 $"key",
 last("value").over(
-  Window.partitionBy($"value").orderBy($"key").rangeBetween(1, 
Long.MaxValue))
+  Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 0))
--- End diff --

So I have been looking at this, and I am not to sure what is being tested 
here. It is currently trivial because we are trying to get the LAST_VALUE of a 
column that is used in the partitioning statement - the result is a constant 
within a partition. 

@chenghao-intel I think you wrote this test, can you perhaps tell me what 
is supposed to be tested here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34182441
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +67,615 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  @transient
+  private[this] lazy val (factories, projection, columns) = {
+// Helper method for creating bound ordering objects.
+def createBoundOrdering(frameType: FrameType, offset: Int) = frameType 
match {
--- End diff --

I'll look into it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34182012
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +67,615 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  @transient
+  private[this] lazy val (factories, projection, columns) = {
--- End diff --

Lets move createBoundOrdering in a separate method. This actually what 
scalac already does already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread yhuai
Github user yhuai commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-119689504
  
@hvanhovell I have finished my first round. Sorry for taking a long time. I 
think I understand the new workflow of the operator and it looks pretty good. I 
think it will be great if we can have more comments to explain how it works 
(specially for some important methods like `createBoundOrdering`). I will focus 
more on readability of the code in my next round.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34180871
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +67,615 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  @transient
+  private[this] lazy val (factories, projection, columns) = {
+// Helper method for creating bound ordering objects.
+def createBoundOrdering(frameType: FrameType, offset: Int) = frameType 
match {
--- End diff --

Let's add some examples to demonstrate how it works for different types of 
frames, different frame boundaries and different ordering direction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34180730
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +67,615 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  @transient
+  private[this] lazy val (factories, projection, columns) = {
--- End diff --

This code block is pretty long, can we break it to multiple parts? Or, can 
we take `createBoundOrdering`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34180206
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +67,615 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  @transient
+  private[this] lazy val (factories, projection, columns) = {
+// Helper method for creating bound ordering objects.
+def createBoundOrdering(frameType: FrameType, offset: Int) = frameType 
match {
+  case RangeFrame =>
+// Use the entire order expression when the offset is 0.
+val (exprs, current, bound) = if (offset == 0) {
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+// Use only the first order expression when the offset is non-null.
+else {
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
+  (sortExpr :: Nil, current, bound)
+}
+// Construct the ordering.
+val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) =>
+  val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)()
+  (SortOrder(ref, e.direction), ref)
+}.unzip
+val ordering = newOrdering(sortExprs, schema)
+RangeBoundOrdering(ordering, current, bound)
+  case RowFrame => RowBoundOrdering(offset)
 }
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
 
-  // Create window functions.
-  private[this] def win

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34180117
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +67,615 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  @transient
+  private[this] lazy val (factories, projection, columns) = {
+// Helper method for creating bound ordering objects.
+def createBoundOrdering(frameType: FrameType, offset: Int) = frameType 
match {
+  case RangeFrame =>
+// Use the entire order expression when the offset is 0.
+val (exprs, current, bound) = if (offset == 0) {
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+// Use only the first order expression when the offset is non-null.
+else {
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
+  (sortExpr :: Nil, current, bound)
+}
+// Construct the ordering.
+val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) =>
+  val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)()
+  (SortOrder(ref, e.direction), ref)
+}.unzip
+val ordering = newOrdering(sortExprs, schema)
+RangeBoundOrdering(ordering, current, bound)
+  case RowFrame => RowBoundOrdering(offset)
 }
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
 
-  // Create window functions.
-  private[this] def win

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34179282
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +67,615 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  @transient
+  private[this] lazy val (factories, projection, columns) = {
+// Helper method for creating bound ordering objects.
+def createBoundOrdering(frameType: FrameType, offset: Int) = frameType 
match {
+  case RangeFrame =>
+// Use the entire order expression when the offset is 0.
+val (exprs, current, bound) = if (offset == 0) {
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+// Use only the first order expression when the offset is non-null.
+else {
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
+  (sortExpr :: Nil, current, bound)
+}
+// Construct the ordering.
+val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) =>
+  val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)()
+  (SortOrder(ref, e.direction), ref)
+}.unzip
+val ordering = newOrdering(sortExprs, schema)
+RangeBoundOrdering(ordering, current, bound)
+  case RowFrame => RowBoundOrdering(offset)
 }
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
 
-  // Create window functions.
-  private[this] def windowFu

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34178681
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +67,615 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  @transient
+  private[this] lazy val (factories, projection, columns) = {
+// Helper method for creating bound ordering objects.
+def createBoundOrdering(frameType: FrameType, offset: Int) = frameType 
match {
+  case RangeFrame =>
+// Use the entire order expression when the offset is 0.
+val (exprs, current, bound) = if (offset == 0) {
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+// Use only the first order expression when the offset is non-null.
+else {
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
+  (sortExpr :: Nil, current, bound)
+}
+// Construct the ordering.
+val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) =>
+  val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)()
+  (SortOrder(ref, e.direction), ref)
+}.unzip
+val ordering = newOrdering(sortExprs, schema)
+RangeBoundOrdering(ordering, current, bound)
+  case RowFrame => RowBoundOrdering(offset)
 }
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
 
-  // Create window functions.
-  private[this] def windowFu

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34175570
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +67,615 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  @transient
+  private[this] lazy val (factories, projection, columns) = {
+// Helper method for creating bound ordering objects.
+def createBoundOrdering(frameType: FrameType, offset: Int) = frameType 
match {
+  case RangeFrame =>
+// Use the entire order expression when the offset is 0.
+val (exprs, current, bound) = if (offset == 0) {
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+// Use only the first order expression when the offset is non-null.
+else {
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
+  (sortExpr :: Nil, current, bound)
+}
+// Construct the ordering.
+val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) =>
+  val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)()
+  (SortOrder(ref, e.direction), ref)
+}.unzip
+val ordering = newOrdering(sortExprs, schema)
+RangeBoundOrdering(ordering, current, bound)
+  case RowFrame => RowBoundOrdering(offset)
 }
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
 
-  // Create window functions.
-  private[this] def win

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34174597
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +67,615 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-// The required child ordering has two parts.
-// The first part is the expressions in the partition specification.
-// We add these expressions to the required ordering to make sure 
input rows are grouped
-// based on the partition specification. So, we only need to process a 
single partition
-// at a time.
-// The second part is the expressions specified in the ORDER BY cluase.
-// Basically, we first use sort to group rows based on partition 
specifications and then sort
-// Rows in a group based on the order specification.
-(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
+} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
   }
 
-  // Since window functions basically add columns to input rows, this 
operator
-  // will not change the ordering of input rows.
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
+
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  case class ComputedWindow(
-unbound: WindowExpression,
-windowFunction: WindowFunction,
-resultAttribute: AttributeReference)
-
-  // A list of window functions that need to be computed for each group.
-  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
-window.collect {
-  case w: WindowExpression =>
-ComputedWindow(
-  w,
-  BindReferences.bindReference(w.windowFunction, child.output),
-  AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+  @transient
+  private[this] lazy val (factories, projection, columns) = {
+// Helper method for creating bound ordering objects.
+def createBoundOrdering(frameType: FrameType, offset: Int) = frameType 
match {
+  case RangeFrame =>
+// Use the entire order expression when the offset is 0.
+val (exprs, current, bound) = if (offset == 0) {
+  val exprs = windowSpec.orderSpec.map(_.child)
+  val projection = newMutableProjection(exprs, child.output)
+  (windowSpec.orderSpec, projection(), projection())
+}
+// Use only the first order expression when the offset is non-null.
+else {
+  val sortExpr = windowSpec.orderSpec.head
+  val expr = sortExpr.child
+  val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
+  val current = newMutableProjection(expr :: Nil, child.output)()
+  val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
+  (sortExpr :: Nil, current, bound)
+}
+// Construct the ordering.
+val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) =>
+  val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)()
+  (SortOrder(ref, e.direction), ref)
+}.unzip
+val ordering = newOrdering(sortExprs, schema)
+RangeBoundOrdering(ordering, current, bound)
+  case RowFrame => RowBoundOrdering(offset)
 }
-  }.toArray
-
-  private[this] val windowFrame =
-windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
 
-  // Create window functions.
-  private[this] def windowFu

[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34169654
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

The line has moved after the documentation updates :(... it is now 84... My 
bad.

This hasn't changed wrt the current Window implementation. The method only 
got moved, and comments got removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34168611
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

Seems line 77 is for `requiredChildDistribution`? At here, we have two 
concepts. `requiredChildDistribution` specifies the requirement of the 
distribution of input rows (e.g. input rows need to be grouped by column `a`). 
`outputPartitioning` indicates the actual partitioning scheme of the output 
rows of this physical operator (e.g. output rows of this operator is grouped by 
column `a` through a hash partitioning scheme with 20 partitions).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34168240
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -19,17 +19,47 @@ package org.apache.spark.sql.execution
 
 import java.util
 
-import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.rdd.RDD
 import org.apache.spark.util.collection.CompactBuffer
+import scala.collection.mutable
 
 /**
  * :: DeveloperApi ::
- * For every row, evaluates `windowExpression` containing Window Functions 
and attaches
- * the results with other regular expressions (presented by `projectList`).
- * Evert operator handles a single Window Specification, `windowSpec`.
+ * This class calculates and outputs (windowed) aggregates over the rows 
in a single (sorted)
+ * partition. The aggregates are calculated for each row in the group. 
Special processing
+ * instructions, frames, are used to calculate these aggregates. Frames 
are processed in the order
+ * specified in the window specification (the ORDER BY ... clause). There 
are four different frame
+ * types:
+ * - Entire partition: The frame is the entire partition, i.e.
+ *   UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. For this case, window 
function will take all
+ *   rows as inputs and be evaluated once.
+ * - Growing frame: We only add new rows into the frame, i.e. UNBOUNDED 
PRECEDING AND 
+ *   Every time we move to a new row to process, we add some rows to the 
frame. We do not remove
+ *   rows from this frame.
+ * - Shrinking frame: We only remove rows from the frame, i.e. ... AND 
UNBOUNDED FOLLOWING.
+ *   Every time we move to a new row to process, we remove some rows from 
the frame. We do not add
+ *   rows to this frame. The frame will originally contain all rows of the 
partition.
--- End diff --

I made a mistake at here. When we have `n FOLLOWING AND UNBOUNDED 
FOLLOWING`, the frame will contain all rows of the partition at the beginning. 
So, we can remove `The frame will originally contain all rows of the 
partition.`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-08 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r34163185
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
 ---
@@ -189,7 +189,7 @@ class HiveDataFrameWindowSuite extends QueryTest {
   df.select(
 $"key",
 last("value").over(
-  Window.partitionBy($"value").orderBy($"key").rangeBetween(1, 
Long.MaxValue))
+  Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 0))
--- End diff --

Yea. The test is wrong. The test data of this one is a bad one (`val df = 
Seq((1, "1"), (2, "2"), (2, "2"), (2, "2"), (1, "1"), (2, "2")).toDF("key", 
"value")`). This dataset cannot test `last_value`. Can you change the dataset? 
Thank you for pointing out the problem!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-117739502
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-117739311
  
  [Test build #36267 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36267/console)
 for   PR 7057 at commit 
[`34f63f8`](https://github.com/apache/spark/commit/34f63f8b3a9f96bd0eb32dee1ec6ee328c63ab5d).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-117703601
  
  [Test build #36267 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36267/consoleFull)
 for   PR 7057 at commit 
[`34f63f8`](https://github.com/apache/spark/commit/34f63f8b3a9f96bd0eb32dee1ec6ee328c63ab5d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-117702439
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-07-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-117702352
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-117461905
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-117461635
  
**[Test build #36225 timed 
out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36225/console)**
 for PR 7057 at commit 
[`27b0329`](https://github.com/apache/spark/commit/27b03293c51ab20b572958fa207071360ed2418c)
 after a configured wait of `175m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-117426446
  
  [Test build #36225 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36225/consoleFull)
 for   PR 7057 at commit 
[`27b0329`](https://github.com/apache/spark/commit/27b03293c51ab20b572958fa207071360ed2418c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-117426229
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7057#issuecomment-117426218
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r33642464
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -19,17 +19,39 @@ package org.apache.spark.sql.execution
 
 import java.util
 
-import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.rdd.RDD
 import org.apache.spark.util.collection.CompactBuffer
+import scala.collection.mutable
 
 /**
  * :: DeveloperApi ::
- * For every row, evaluates `windowExpression` containing Window Functions 
and attaches
- * the results with other regular expressions (presented by `projectList`).
- * Evert operator handles a single Window Specification, `windowSpec`.
+ * This class calculates and outputs (windowed) aggregates over the rows 
in a single sorted group.
+ * The aggregates are calculated for each row in the group. An aggregate 
can take a few forms:
+ * - Global: The aggregate is calculated for the entire group. Every row 
has the same value.
+ * - Rows: The aggregate is calculated based on a subset of the window, 
and is unique for each
+ * row and depends on the position of the given row within the window. The 
group must be sorted
+ * for this to produce sensible output. Examples are moving averages, 
running sums and row
+ * numbers.
+ * - Range: The aggregate is calculated based on a subset of the window, 
and is unique for each
+ * value of the order by clause and depends on its ordering. The group 
must be sorted for this to
+ * produce sensible output.
+ * - Shifted: The aggregate is a displaced value relative to the position 
of the given row.
+ * Examples are Lead and Lag.
--- End diff --

The PR also optimizes the processing of Moving and Shrinking frames:
* For moving frame processing the number of comparisons are reduced. This 
didn't look like the most rewarding improvement, but I was surprised to find it 
did improved performance by quite a margin.
* Shrinking frames are indeed processed in reverse order. Which makes 
building it as fast as the growing case (it uses more memory though). I share 
your concerns, and solving this at the root (the function itself) would indeed 
be the best. I'll revert this for now, and file a JIRA request for future 
reference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...

2015-06-30 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7057#discussion_r33642234
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -37,443 +59,622 @@ case class Window(
 child: SparkPlan)
   extends UnaryNode {
 
-  override def output: Seq[Attribute] =
-(projectList ++ windowExpression).map(_.toAttribute)
+  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
 
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
 if (windowSpec.partitionSpec.isEmpty) {
-  // This operator will be very expensive.
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
   AllTuples :: Nil
-} else {
-  ClusteredDistribution(windowSpec.partitionSpec) :: Nil
-}
-
-  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
-  // is preserved.
-  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

It is on line 77 in the new version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >