Repository: spark
Updated Branches:
  refs/heads/master ec122209f -> 2f00a71a8


[SPARK-22257][SQL] Reserve all non-deterministic expressions in ExpressionSet

## What changes were proposed in this pull request?

For non-deterministic expressions, they should be considered as not contained 
in the [[ExpressionSet]].
This is consistent with how we define `semanticEquals` between two expressions.
Otherwise, combining expressions will remove non-deterministic expressions 
which should be reserved.
E.g.
Combine filters of
```scala
testRelation.where(Rand(0) > 0.1).where(Rand(0) > 0.1)
```
should result in
```scala
testRelation.where(Rand(0) > 0.1 && Rand(0) > 0.1)
```

## How was this patch tested?

Unit test

Author: Wang Gengliang <ltn...@gmail.com>

Closes #19475 from gengliangwang/non-deterministic-expressionSet.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f00a71a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f00a71a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f00a71a

Branch: refs/heads/master
Commit: 2f00a71a876321af02865d7cd53ada167e1ce2e3
Parents: ec12220
Author: Wang Gengliang <ltn...@gmail.com>
Authored: Thu Oct 12 22:45:19 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Oct 12 22:45:19 2017 -0700

----------------------------------------------------------------------
 .../catalyst/expressions/ExpressionSet.scala    | 23 ++++++---
 .../expressions/ExpressionSetSuite.scala        | 51 +++++++++++++++-----
 .../optimizer/FilterPushdownSuite.scala         | 15 ++++++
 3 files changed, 72 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2f00a71a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
index 305ac90..7e8e7b8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
@@ -30,8 +30,9 @@ object ExpressionSet {
 }
 
 /**
- * A [[Set]] where membership is determined based on a canonical 
representation of an [[Expression]]
- * (i.e. one that attempts to ignore cosmetic differences).  See 
[[Canonicalize]] for more details.
+ * A [[Set]] where membership is determined based on determinacy and a 
canonical representation of
+ * an [[Expression]] (i.e. one that attempts to ignore cosmetic differences).
+ * See [[Canonicalize]] for more details.
  *
  * Internally this set uses the canonical representation, but keeps also track 
of the original
  * expressions to ease debugging.  Since different expressions can share the 
same canonical
@@ -46,6 +47,10 @@ object ExpressionSet {
  *   set.contains(1 + a) => true
  *   set.contains(a + 2) => false
  * }}}
+ *
+ * For non-deterministic expressions, they are always considered as not 
contained in the [[Set]].
+ * On adding a non-deterministic expression, simply append it to the original 
expressions.
+ * This is consistent with how we define `semanticEquals` between two 
expressions.
  */
 class ExpressionSet protected(
     protected val baseSet: mutable.Set[Expression] = new mutable.HashSet,
@@ -53,7 +58,9 @@ class ExpressionSet protected(
   extends Set[Expression] {
 
   protected def add(e: Expression): Unit = {
-    if (!baseSet.contains(e.canonicalized)) {
+    if (!e.deterministic) {
+      originals += e
+    } else if (!baseSet.contains(e.canonicalized) ) {
       baseSet.add(e.canonicalized)
       originals += e
     }
@@ -74,9 +81,13 @@ class ExpressionSet protected(
   }
 
   override def -(elem: Expression): ExpressionSet = {
-    val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
-    val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
-    new ExpressionSet(newBaseSet, newOriginals)
+    if (elem.deterministic) {
+      val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
+      val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
+      new ExpressionSet(newBaseSet, newOriginals)
+    } else {
+      new ExpressionSet(baseSet.clone(), originals.clone())
+    }
   }
 
   override def iterator: Iterator[Expression] = originals.iterator

http://git-wip-us.apache.org/repos/asf/spark/blob/2f00a71a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
index a1000a0..12eddf5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
@@ -175,20 +175,14 @@ class ExpressionSetSuite extends SparkFunSuite {
     aUpper > bUpper || aUpper <= Rand(1L) || aUpper <= 10,
     aUpper <= Rand(1L) || aUpper <= 10 || aUpper > bUpper)
 
-  // Partial reorder case: we don't reorder non-deterministic expressions,
-  // but we can reorder sub-expressions in deterministic AND/OR expressions.
-  // There are two predicates:
-  //   (aUpper > bUpper || bUpper > 100) => we can reorder sub-expressions in 
it.
-  //   (aUpper === Rand(1L))
-  setTest(1,
+  // Keep all the non-deterministic expressions even they are semantically 
equal.
+  setTest(2, Rand(1L), Rand(1L))
+
+  setTest(2,
     (aUpper > bUpper || bUpper > 100) && aUpper === Rand(1L),
     (bUpper > 100 || aUpper > bUpper) && aUpper === Rand(1L))
 
-  // There are three predicates:
-  //   (Rand(1L) > aUpper)
-  //   (aUpper <= Rand(1L) && aUpper > bUpper)
-  //   (aUpper > 10 && bUpper > 10) => we can reorder sub-expressions in it.
-  setTest(1,
+  setTest(2,
     Rand(1L) > aUpper || (aUpper <= Rand(1L) && aUpper > bUpper) || (aUpper > 
10 && bUpper > 10),
     Rand(1L) > aUpper || (aUpper <= Rand(1L) && aUpper > bUpper) || (bUpper > 
10 && aUpper > 10))
 
@@ -219,4 +213,39 @@ class ExpressionSetSuite extends SparkFunSuite {
     assert((initialSet ++ setToAddWithSameExpression).size == 2)
     assert((initialSet ++ setToAddWithOutSameExpression).size == 3)
   }
+
+  test("add single element to set with non-deterministic expressions") {
+    val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil)
+
+    assert((initialSet + (aUpper + 1)).size == 2)
+    assert((initialSet + Rand(0)).size == 3)
+    assert((initialSet + (aUpper + 2)).size == 3)
+  }
+
+  test("remove single element to set with non-deterministic expressions") {
+    val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil)
+
+    assert((initialSet - (aUpper + 1)).size == 1)
+    assert((initialSet - Rand(0)).size == 2)
+    assert((initialSet - (aUpper + 2)).size == 2)
+  }
+
+  test("add multiple elements to set with non-deterministic expressions") {
+    val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil)
+    val setToAddWithSameDeterministicExpression = ExpressionSet(aUpper + 1 :: 
Rand(0) :: Nil)
+    val setToAddWithOutSameExpression = ExpressionSet(aUpper + 3 :: aUpper + 4 
:: Nil)
+
+    assert((initialSet ++ setToAddWithSameDeterministicExpression).size == 3)
+    assert((initialSet ++ setToAddWithOutSameExpression).size == 4)
+  }
+
+  test("remove multiple elements to set with non-deterministic expressions") {
+    val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil)
+    val setToRemoveWithSameDeterministicExpression = ExpressionSet(aUpper + 1 
:: Rand(0) :: Nil)
+    val setToRemoveWithOutSameExpression = ExpressionSet(aUpper + 3 :: aUpper 
+ 4 :: Nil)
+
+    assert((initialSet -- setToRemoveWithSameDeterministicExpression).size == 
1)
+    assert((initialSet -- setToRemoveWithOutSameExpression).size == 2)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2f00a71a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 582b3ea..de0e7c7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -94,6 +94,21 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("combine redundant deterministic filters") {
+    val originalQuery =
+      testRelation
+        .where(Rand(0) > 0.1 && 'a === 1)
+        .where(Rand(0) > 0.1 && 'a === 1)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      testRelation
+        .where(Rand(0) > 0.1 && 'a === 1 && Rand(0) > 0.1)
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("SPARK-16164: Filter pushdown should keep the ordering in the logical 
plan") {
     val originalQuery =
       testRelation


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to