[GitHub] spark pull request #21139: [SPARK-24066][SQL]Add a window exchange rule to e...

2018-11-05 Thread heary-cao
Github user heary-cao closed the pull request at:

https://github.com/apache/spark/pull/21139


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21139: [SPARK-24066][SQL]Add a window exchange rule to e...

2018-04-24 Thread heary-cao
Github user heary-cao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21139#discussion_r183929216
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -618,6 +619,18 @@ object CollapseRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Exchanged the adjacent logical window operator according to the order 
field of window.
+ */
+object ExchangeWindowWithOrderField extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case w1 @ Window(_, _, orderSpec1, w2 @ Window(_, _, orderSpec2, 
child2))
+  if orderSpec1.size > orderSpec2.size =>
--- End diff --

@hvanhovell , Sorry, I fix it. thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21139: [SPARK-24066][SQL]Add a window exchange rule to e...

2018-04-24 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21139#discussion_r183893505
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -618,6 +619,18 @@ object CollapseRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Exchanged the adjacent logical window operator according to the order 
field of window.
+ */
+object ExchangeWindowWithOrderField extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case w1 @ Window(_, _, orderSpec1, w2 @ Window(_, _, orderSpec2, 
child2))
+  if orderSpec1.size > orderSpec2.size =>
--- End diff --

I am not sure if this a good idea, because this does not consider the 
partitioning expressions or the actual ordering. You might actively make things 
worse if you do this. For example when you have three subsequent window 
operators like this:
```
// Before rule
window[partition by A order by B, C]
:- window[partition by B order by C]
   :- window[partition by B order by D]

// After rule (notice how we add another exchange here)
window[partition by B order by C]
:- window[partition by A order by B, C]
   :- window[partition by B order by D]
```
I think this only has merit if the partitioning expression set matches, and 
the smaller ordering clause is the prefix of the larger ordering clause.

Moreover you are messing with the output order, so you need to make sure 
there is a project (or something else) on top that retains the original order 
(might not be problem due to the way we currently plan Window operators).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21139: [SPARK-24066][SQL]Add a window exchange rule to e...

2018-04-24 Thread heary-cao
GitHub user heary-cao opened a pull request:

https://github.com/apache/spark/pull/21139

[SPARK-24066][SQL]Add a window exchange rule to eliminate redundant 
physical plan SortExec

## What changes were proposed in this pull request?
Currently, when the order field of window function has a subset 
relationship, SparkSQL will randomly generate different physical plan.
Similar like:

```
case class DistinctAgg(a: Int, b: Float, c: Double, d: Int, e: String)
val df = spark.sparkContext.parallelize(
  DistinctAgg(8, 2, 3, 4, "a") ::
  DistinctAgg(9, 3, 4, 5, "b") ::
  DistinctAgg(3, 4, 5, 6, "c") ::
  DistinctAgg(3, 4, 5, 7, "c") ::
  DistinctAgg(3, 4, 5, 8, "c") ::
  DistinctAgg(3, 6, 6, 9, "d") ::
  DistinctAgg(30, 40, 50, 60, "e") ::
  DistinctAgg(41, 51, 61, 71, null) ::
  DistinctAgg(42, 52, 62, 72, null) ::
  DistinctAgg(43, 53, 63, 73, "k") ::Nil).toDF()
df.createOrReplaceTempView("distinctAgg")

select a, b, c, 
avg(b) over(partition by a order by b) as sumIb, 
sum(d) over(partition by a order by b, c) as sumId, d 
from distinctAgg

```
The physics plan will produce different results randomly.   
**One**: there is only one sort of physical plan 
```
== Physical Plan ==
*(3) Project [a#181, b#182, c#183, sumId#210L, sumIb#209L, d#184]
+- Window [sum(cast(b#182 as bigint)) windowspecdefinition(a#181, b#182 ASC 
NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS sumIb#209L], [a#181], [b#182 ASC NULLS FIRST]
   +- Window [sum(cast(d#184 as bigint)) windowspecdefinition(a#181, b#182 
ASC NULLS FIRST, c#183 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
unboundedpreceding$(), currentrow$())) AS sumId#210L], [a#181], [b#182 ASC 
NULLS FIRST, c#183 ASC NULLS FIRST]
  +- *(2) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST, c#183 ASC 
NULLS FIRST], false, 0
 +- Exchange hashpartitioning(a#181, 5)
+- *(1) Project [a#181, b#182, c#183, d#184]
   +- *(1) SerializeFromObject [assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).a AS a#181, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).b AS b#182, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).c AS c#183, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).d AS d#184, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).e, true, false) AS 
e#185]
  +- Scan ExternalRDDScan[obj#180]

```
**Another one**: there is two sort of physical plans
```
== Physical Plan ==
*(4) Project [a#181, b#182, c#183, sumId#210L, sumIb#209L, d#184]
+- Window [sum(cast(d#184 as bigint)) windowspecdefinition(a#181, b#182 ASC 
NULLS FIRST, c#183 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
unboundedpreceding$(), currentrow$())) AS sumId#210L], [a#181], [b#182 ASC 
NULLS FIRST, c#183 ASC NULLS FIRST]
   +- *(3) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST, c#183 ASC 
NULLS FIRST], false, 0
  +- Window [sum(cast(b#182 as bigint)) windowspecdefinition(a#181, 
b#182 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS sumIb#209L], [a#181], [b#182 ASC NULLS FIRST]
 +- *(2) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST], 
false, 0
+- Exchange hashpartitioning(a#181, 5)
   +- *(1) Project [a#181, b#182, c#183, d#184]
  +- *(1) SerializeFromObject [assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).a AS a#181, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).b AS b#182, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).c AS c#183, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).d AS d#184, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).e, true, false) AS 
e#185]
 +- Scan ExternalRDDScan[obj#180]

```
this PR add an exchange rule to ensure that no redundant physical plan 
SortExec is generated.

## How was this patch tested?

add new unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/heary-cao/spark ExchangeWindow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21139.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the followin