spark git commit: [SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.

2016-09-29 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ca8130050 -> 7ffafa3bf


[SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates 
correctly in non-deterministic condition.

## What changes were proposed in this pull request?

Currently our Optimizer may reorder the predicates to run them more efficient, 
but in non-deterministic condition, change the order between deterministic 
parts and non-deterministic parts may change the number of input rows. For 
example:
```SELECT a FROM t WHERE rand() < 0.1 AND a = 1```
And
```SELECT a FROM t WHERE a = 1 AND rand() < 0.1```
may call rand() for different times and therefore the output rows differ.

This PR improved this condition by checking whether the predicate is placed 
before any non-deterministic predicates.

## How was this patch tested?

Expanded related testcases in FilterPushdownSuite.

Author: 蒋星博 

Closes #14012 from jiangxb1987/ppd.

(cherry picked from commit f376c37268848dbb4b2fb57677e22ef2bf207b49)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ffafa3b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ffafa3b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ffafa3b

Branch: refs/heads/branch-2.0
Commit: 7ffafa3bfecb8bc92b79eddea1ca18166efd3385
Parents: ca81300
Author: 蒋星博 
Authored: Thu Jul 14 00:21:27 2016 +0800
Committer: Josh Rosen 
Committed: Thu Sep 29 11:44:00 2016 -0700

--
 .../sql/catalyst/optimizer/Optimizer.scala  | 44 +---
 .../optimizer/FilterPushdownSuite.scala |  8 ++--
 2 files changed, 33 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7ffafa3b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d824c2e..35b122d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1031,19 +1031,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
   project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
 
 // Push [[Filter]] operators through [[Window]] operators. Parts of the 
predicate that can be
-// pushed beneath must satisfy the following two conditions:
+// pushed beneath must satisfy the following conditions:
 // 1. All the expressions are part of window partitioning key. The 
expressions can be compound.
-// 2. Deterministic
+// 2. Deterministic.
+// 3. Placed before any non-deterministic predicates.
 case filter @ Filter(condition, w: Window)
 if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
   val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
-  val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{ cond =>
-cond.references.subsetOf(partitionAttrs) && cond.deterministic &&
-  // This is for ensuring all the partitioning expressions have been 
converted to alias
-  // in Analyzer. Thus, we do not need to check if the expressions in 
conditions are
-  // the same as the expressions used in partitioning columns.
-  partitionAttrs.forall(_.isInstanceOf[Attribute])
+
+  val (candidates, containingNonDeterministic) =
+splitConjunctivePredicates(condition).span(_.deterministic)
+
+  val (pushDown, rest) = candidates.partition { cond =>
+cond.references.subsetOf(partitionAttrs)
   }
+
+  val stayUp = rest ++ containingNonDeterministic
+
   if (pushDown.nonEmpty) {
 val pushDownPredicate = pushDown.reduce(And)
 val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
@@ -1062,11 +1066,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
 
   // For each filter, expand the alias and check if the filter can be 
evaluated using
   // attributes produced by the aggregate operator's child operator.
-  val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{ cond =>
+  val (candidates, containingNonDeterministic) =
+splitConjunctivePredicates(condition).span(_.deterministic)
+
+  val (pushDown, rest) = candidates.partition { cond =>
 val replaced = replaceAlias(cond, aliasMap)
-

spark git commit: [SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.

2016-07-13 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master ea06e4ef3 -> f376c3726


[SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates 
correctly in non-deterministic condition.

## What changes were proposed in this pull request?

Currently our Optimizer may reorder the predicates to run them more efficient, 
but in non-deterministic condition, change the order between deterministic 
parts and non-deterministic parts may change the number of input rows. For 
example:
```SELECT a FROM t WHERE rand() < 0.1 AND a = 1```
And
```SELECT a FROM t WHERE a = 1 AND rand() < 0.1```
may call rand() for different times and therefore the output rows differ.

This PR improved this condition by checking whether the predicate is placed 
before any non-deterministic predicates.

## How was this patch tested?

Expanded related testcases in FilterPushdownSuite.

Author: 蒋星博 

Closes #14012 from jiangxb1987/ppd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f376c372
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f376c372
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f376c372

Branch: refs/heads/master
Commit: f376c37268848dbb4b2fb57677e22ef2bf207b49
Parents: ea06e4e
Author: 蒋星博 
Authored: Thu Jul 14 00:21:27 2016 +0800
Committer: Cheng Lian 
Committed: Thu Jul 14 00:21:27 2016 +0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  | 44 +---
 .../optimizer/FilterPushdownSuite.scala |  8 ++--
 2 files changed, 33 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f376c372/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 368e9a5..08fb019 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1128,19 +1128,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
   project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
 
 // Push [[Filter]] operators through [[Window]] operators. Parts of the 
predicate that can be
-// pushed beneath must satisfy the following two conditions:
+// pushed beneath must satisfy the following conditions:
 // 1. All the expressions are part of window partitioning key. The 
expressions can be compound.
-// 2. Deterministic
+// 2. Deterministic.
+// 3. Placed before any non-deterministic predicates.
 case filter @ Filter(condition, w: Window)
 if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
   val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
-  val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{ cond =>
-cond.references.subsetOf(partitionAttrs) && cond.deterministic &&
-  // This is for ensuring all the partitioning expressions have been 
converted to alias
-  // in Analyzer. Thus, we do not need to check if the expressions in 
conditions are
-  // the same as the expressions used in partitioning columns.
-  partitionAttrs.forall(_.isInstanceOf[Attribute])
+
+  val (candidates, containingNonDeterministic) =
+splitConjunctivePredicates(condition).span(_.deterministic)
+
+  val (pushDown, rest) = candidates.partition { cond =>
+cond.references.subsetOf(partitionAttrs)
   }
+
+  val stayUp = rest ++ containingNonDeterministic
+
   if (pushDown.nonEmpty) {
 val pushDownPredicate = pushDown.reduce(And)
 val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
@@ -1159,11 +1163,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
 
   // For each filter, expand the alias and check if the filter can be 
evaluated using
   // attributes produced by the aggregate operator's child operator.
-  val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{ cond =>
+  val (candidates, containingNonDeterministic) =
+splitConjunctivePredicates(condition).span(_.deterministic)
+
+  val (pushDown, rest) = candidates.partition { cond =>
 val replaced = replaceAlias(cond, aliasMap)
-replaced.references.subsetOf(aggregate.child.outputSet) && 
replaced.deterministic
+replaced.references.subsetOf(aggregate.child.outputSet)
   }
 
+  val stayUp =