Repository: spark
Updated Branches:
  refs/heads/master ee3af15fe -> cfbe11e81


[SPARK-22895][SQL] Push down the deterministic predicates that are after the 
first non-deterministic

## What changes were proposed in this pull request?
Currently, we do not guarantee an order evaluation of conjuncts in either 
Filter or Join operator. This is also true to the mainstream RDBMS vendors like 
DB2 and MS SQL Server. Thus, we should also push down the deterministic 
predicates that are after the first non-deterministic, if possible.

## How was this patch tested?
Updated the existing test cases.

Author: gatorsmile <gatorsm...@gmail.com>

Closes #20069 from gatorsmile/morePushDown.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfbe11e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfbe11e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfbe11e8

Branch: refs/heads/master
Commit: cfbe11e8164c04cd7d388e4faeded21a9331dac4
Parents: ee3af15
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Sun Dec 31 15:06:54 2017 +0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Sun Dec 31 15:06:54 2017 +0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  1 +
 .../sql/catalyst/optimizer/Optimizer.scala      | 40 ++++++++------------
 .../optimizer/FilterPushdownSuite.scala         | 33 ++++++++--------
 .../v2/PushDownOperatorsToDataSource.scala      | 10 ++---
 .../execution/python/ExtractPythonUDFs.scala    |  6 +--
 .../StreamingSymmetricHashJoinHelper.scala      |  5 +--
 .../python/BatchEvalPythonExecSuite.scala       | 10 +++--
 .../StreamingSymmetricHashJoinHelperSuite.scala | 14 +++----
 8 files changed, 54 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 4b5f56c..dc3e384 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1636,6 +1636,7 @@ options.
 
   - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when 
the referenced columns only include the internal corrupt record column (named 
`_corrupt_record` by default). For example, 
`spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()`
 and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. 
Instead, you can cache or save the parsed results and then send the same query. 
For example, `val df = spark.read.schema(schema).json(file).cache()` and then 
`df.filter($"_corrupt_record".isNotNull).count()`.
   - The `percentile_approx` function previously accepted numeric type input 
and output double type results. Now it supports date type, timestamp type and 
numeric types as input types. The result type is also changed to be the same as 
the input type, which is more reasonable for percentiles.
+  - Since Spark 2.3, the Join/Filter's deterministic predicates that are after 
the first non-deterministic predicates are also pushed down/through the child 
operators, if possible. In prior Spark versions, these filters are not eligible 
for predicate pushdown.
   - Partition column inference previously found incorrect common type for 
different inferred types, for example, previously it ended up with double type 
as the common type for double type and date type. Now it finds the correct 
common type for such conflicts. The conflict resolution follows the table below:
 
     <table class="table">

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index eeb1b13..0d4b02c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -805,15 +805,15 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
 
       // For each filter, expand the alias and check if the filter can be 
evaluated using
       // attributes produced by the aggregate operator's child operator.
-      val (candidates, containingNonDeterministic) =
-        splitConjunctivePredicates(condition).span(_.deterministic)
+      val (candidates, nonDeterministic) =
+        splitConjunctivePredicates(condition).partition(_.deterministic)
 
       val (pushDown, rest) = candidates.partition { cond =>
         val replaced = replaceAlias(cond, aliasMap)
         cond.references.nonEmpty && 
replaced.references.subsetOf(aggregate.child.outputSet)
       }
 
-      val stayUp = rest ++ containingNonDeterministic
+      val stayUp = rest ++ nonDeterministic
 
       if (pushDown.nonEmpty) {
         val pushDownPredicate = pushDown.reduce(And)
@@ -835,14 +835,14 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
       if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
       val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
 
-      val (candidates, containingNonDeterministic) =
-        splitConjunctivePredicates(condition).span(_.deterministic)
+      val (candidates, nonDeterministic) =
+        splitConjunctivePredicates(condition).partition(_.deterministic)
 
       val (pushDown, rest) = candidates.partition { cond =>
         cond.references.subsetOf(partitionAttrs)
       }
 
-      val stayUp = rest ++ containingNonDeterministic
+      val stayUp = rest ++ nonDeterministic
 
       if (pushDown.nonEmpty) {
         val pushDownPredicate = pushDown.reduce(And)
@@ -854,7 +854,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
 
     case filter @ Filter(condition, union: Union) =>
       // Union could change the rows, so non-deterministic predicate can't be 
pushed down
-      val (pushDown, stayUp) = 
splitConjunctivePredicates(condition).span(_.deterministic)
+      val (pushDown, stayUp) = 
splitConjunctivePredicates(condition).partition(_.deterministic)
 
       if (pushDown.nonEmpty) {
         val pushDownCond = pushDown.reduceLeft(And)
@@ -878,13 +878,9 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
       }
 
     case filter @ Filter(condition, watermark: EventTimeWatermark) =>
-      // We can only push deterministic predicates which don't reference the 
watermark attribute.
-      // We could in theory span() only on determinism and pull out 
deterministic predicates
-      // on the watermark separately. But it seems unnecessary and a bit 
confusing to not simply
-      // use the prefix as we do for nondeterminism in other cases.
-
-      val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(
-        p => p.deterministic && !p.references.contains(watermark.eventTime))
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{ p =>
+        p.deterministic && !p.references.contains(watermark.eventTime)
+      }
 
       if (pushDown.nonEmpty) {
         val pushDownPredicate = pushDown.reduceLeft(And)
@@ -925,14 +921,14 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
     // come from grandchild.
     // TODO: non-deterministic predicates could be pushed through some 
operators that do not change
     // the rows.
-    val (candidates, containingNonDeterministic) =
-      splitConjunctivePredicates(filter.condition).span(_.deterministic)
+    val (candidates, nonDeterministic) =
+      splitConjunctivePredicates(filter.condition).partition(_.deterministic)
 
     val (pushDown, rest) = candidates.partition { cond =>
       cond.references.subsetOf(grandchild.outputSet)
     }
 
-    val stayUp = rest ++ containingNonDeterministic
+    val stayUp = rest ++ nonDeterministic
 
     if (pushDown.nonEmpty) {
       val newChild = insertFilter(pushDown.reduceLeft(And))
@@ -975,23 +971,19 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   /**
    * Splits join condition expressions or filter predicates (on a given join's 
output) into three
    * categories based on the attributes required to evaluate them. Note that 
we explicitly exclude
-   * on-deterministic (i.e., stateful) condition expressions in 
canEvaluateInLeft or
+   * non-deterministic (i.e., stateful) condition expressions in 
canEvaluateInLeft or
    * canEvaluateInRight to prevent pushing these predicates on either side of 
the join.
    *
    * @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
    */
   private def split(condition: Seq[Expression], left: LogicalPlan, right: 
LogicalPlan) = {
-    // Note: In order to ensure correctness, it's important to not change the 
relative ordering of
-    // any deterministic expression that follows a non-deterministic 
expression. To achieve this,
-    // we only consider pushing down those expressions that precede the first 
non-deterministic
-    // expression in the condition.
-    val (pushDownCandidates, containingNonDeterministic) = 
condition.span(_.deterministic)
+    val (pushDownCandidates, nonDeterministic) = 
condition.partition(_.deterministic)
     val (leftEvaluateCondition, rest) =
       pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
     val (rightEvaluateCondition, commonCondition) =
         rest.partition(expr => expr.references.subsetOf(right.outputSet))
 
-    (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
containingNonDeterministic)
+    (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index a9c2306..85a5e97 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -831,9 +831,9 @@ class FilterPushdownSuite extends PlanTest {
     val optimized = Optimize.execute(originalQuery.analyze)
 
     val correctAnswer = Union(Seq(
-      testRelation.where('a === 2L),
-      testRelation2.where('d === 2L)))
-      .where('b + Rand(10).as("rnd") === 3 && 'c > 5L)
+      testRelation.where('a === 2L && 'c > 5L),
+      testRelation2.where('d === 2L && 'f > 5L)))
+      .where('b + Rand(10).as("rnd") === 3)
       .analyze
 
     comparePlans(optimized, correctAnswer)
@@ -1134,12 +1134,13 @@ class FilterPushdownSuite extends PlanTest {
     val x = testRelation.subquery('x)
     val y = testRelation.subquery('y)
 
-    // Verify that all conditions preceding the first non-deterministic 
condition are pushed down
+    // Verify that all conditions except the watermark touching condition are 
pushed down
     // by the optimizer and others are not.
     val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && 
"y.a".attr === 5 &&
       "x.a".attr === Rand(10) && "y.b".attr === 5))
-    val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 
5),
-        condition = Some("x.a".attr === Rand(10) && "y.b".attr === 5))
+    val correctAnswer =
+      x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr 
=== 5),
+        condition = Some("x.a".attr === Rand(10)))
 
     // CheckAnalysis will ensure nondeterministic expressions not appear in 
join condition.
     // TODO support nondeterministic expressions in join condition.
@@ -1147,16 +1148,16 @@ class FilterPushdownSuite extends PlanTest {
       checkAnalysis = false)
   }
 
-  test("watermark pushdown: no pushdown on watermark attribute") {
+  test("watermark pushdown: no pushdown on watermark attribute #1") {
     val interval = new CalendarInterval(2, 2000L)
 
-    // Verify that all conditions preceding the first watermark touching 
condition are pushed down
+    // Verify that all conditions except the watermark touching condition are 
pushed down
     // by the optimizer and others are not.
     val originalQuery = EventTimeWatermark('b, interval, testRelation)
       .where('a === 5 && 'b === 10 && 'c === 5)
     val correctAnswer = EventTimeWatermark(
-      'b, interval, testRelation.where('a === 5))
-      .where('b === 10 && 'c === 5)
+      'b, interval, testRelation.where('a === 5 && 'c === 5))
+      .where('b === 10)
 
     comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
       checkAnalysis = false)
@@ -1165,7 +1166,7 @@ class FilterPushdownSuite extends PlanTest {
   test("watermark pushdown: no pushdown for nondeterministic filter") {
     val interval = new CalendarInterval(2, 2000L)
 
-    // Verify that all conditions preceding the first watermark touching 
condition are pushed down
+    // Verify that all conditions except the watermark touching condition are 
pushed down
     // by the optimizer and others are not.
     val originalQuery = EventTimeWatermark('c, interval, testRelation)
       .where('a === 5 && 'b === Rand(10) && 'c === 5)
@@ -1180,7 +1181,7 @@ class FilterPushdownSuite extends PlanTest {
   test("watermark pushdown: full pushdown") {
     val interval = new CalendarInterval(2, 2000L)
 
-    // Verify that all conditions preceding the first watermark touching 
condition are pushed down
+    // Verify that all conditions except the watermark touching condition are 
pushed down
     // by the optimizer and others are not.
     val originalQuery = EventTimeWatermark('c, interval, testRelation)
       .where('a === 5 && 'b === 10)
@@ -1191,15 +1192,15 @@ class FilterPushdownSuite extends PlanTest {
       checkAnalysis = false)
   }
 
-  test("watermark pushdown: empty pushdown") {
+  test("watermark pushdown: no pushdown on watermark attribute #2") {
     val interval = new CalendarInterval(2, 2000L)
 
-    // Verify that all conditions preceding the first watermark touching 
condition are pushed down
-    // by the optimizer and others are not.
     val originalQuery = EventTimeWatermark('a, interval, testRelation)
       .where('a === 5 && 'b === 10)
+    val correctAnswer = EventTimeWatermark(
+      'a, interval, testRelation.where('b === 10)).where('a === 5)
 
-    comparePlans(Optimize.execute(originalQuery.analyze), 
originalQuery.analyze,
+    comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
       checkAnalysis = false)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index 0c17081..df034ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -40,12 +40,8 @@ object PushDownOperatorsToDataSource extends 
Rule[LogicalPlan] with PredicateHel
     // top-down, then we can simplify the logic here and only collect target 
operators.
     val filterPushed = plan transformUp {
       case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, 
reader)) =>
-        // Non-deterministic expressions are stateful and we must keep the 
input sequence unchanged
-        // to avoid changing the result. This means, we can't evaluate the 
filter conditions that
-        // are after the first non-deterministic condition ahead. Here we only 
try to push down
-        // deterministic conditions that are before the first 
non-deterministic condition.
-        val (candidates, containingNonDeterministic) =
-          splitConjunctivePredicates(condition).span(_.deterministic)
+        val (candidates, nonDeterministic) =
+          splitConjunctivePredicates(condition).partition(_.deterministic)
 
         val stayUpFilters: Seq[Expression] = reader match {
           case r: SupportsPushDownCatalystFilters =>
@@ -74,7 +70,7 @@ object PushDownOperatorsToDataSource extends 
Rule[LogicalPlan] with PredicateHel
           case _ => candidates
         }
 
-        val filterCondition = (stayUpFilters ++ 
containingNonDeterministic).reduceLeftOption(And)
+        val filterCondition = (stayUpFilters ++ 
nonDeterministic).reduceLeftOption(And)
         val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r)
         if (withFilter.output == fields) {
           withFilter

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index f5a4cbc..2f53fe7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -202,12 +202,12 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with 
PredicateHelper {
   private def trySplitFilter(plan: SparkPlan): SparkPlan = {
     plan match {
       case filter: FilterExec =>
-        val (candidates, containingNonDeterministic) =
-          splitConjunctivePredicates(filter.condition).span(_.deterministic)
+        val (candidates, nonDeterministic) =
+          
splitConjunctivePredicates(filter.condition).partition(_.deterministic)
         val (pushDown, rest) = candidates.partition(!hasPythonUDF(_))
         if (pushDown.nonEmpty) {
           val newChild = FilterExec(pushDown.reduceLeft(And), filter.child)
-          FilterExec((rest ++ containingNonDeterministic).reduceLeft(And), 
newChild)
+          FilterExec((rest ++ nonDeterministic).reduceLeft(And), newChild)
         } else {
           filter
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
index 167e991..217e98a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
@@ -72,8 +72,7 @@ object StreamingSymmetricHashJoinHelper extends Logging {
    * left AND right AND joined is equivalent to full.
    *
    * Note that left and right do not necessarily contain *all* conjuncts which 
satisfy
-   * their condition. Any conjuncts after the first nondeterministic one are 
treated as
-   * nondeterministic for purposes of the split.
+   * their condition.
    *
    * @param leftSideOnly Deterministic conjuncts which reference only the left 
side of the join.
    * @param rightSideOnly Deterministic conjuncts which reference only the 
right side of the join.
@@ -111,7 +110,7 @@ object StreamingSymmetricHashJoinHelper extends Logging {
           // Span rather than partition, because nondeterministic expressions 
don't commute
           // across AND.
           val (deterministicConjuncts, nonDeterministicConjuncts) =
-            splitConjunctivePredicates(condition.get).span(_.deterministic)
+            
splitConjunctivePredicates(condition.get).partition(_.deterministic)
 
           val (leftConjuncts, nonLeftConjuncts) = 
deterministicConjuncts.partition { cond =>
             cond.references.subsetOf(left.outputSet)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
index 9e4a2e8..d456c93 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
@@ -75,13 +75,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with 
SharedSQLContext {
     assert(qualifiedPlanNodes.size == 2)
   }
 
-  test("Python UDF: no push down on predicates starting from the first 
non-deterministic") {
+  test("Python UDF: push down on deterministic predicates after the first 
non-deterministic") {
     val df = Seq(("Hello", 4)).toDF("a", "b")
       .where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")
+
     val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
-      case f @ FilterExec(And(_: And, _: GreaterThan), InputAdapter(_: 
BatchEvalPythonExec)) => f
+      case f @ FilterExec(
+          And(_: AttributeReference, _: GreaterThan),
+          InputAdapter(_: BatchEvalPythonExec)) => f
+      case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(_: FilterExec)) 
=> b
     }
-    assert(qualifiedPlanNodes.size == 1)
+    assert(qualifiedPlanNodes.size == 2)
   }
 
   test("Python UDF refers to the attributes from more than one child") {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfbe11e8/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
index 2a854e3..69b7154 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala
@@ -18,10 +18,8 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.sql.Column
-import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, 
SparkPlan}
-import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+import org.apache.spark.sql.execution.LocalTableScanExec
 import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates
 import org.apache.spark.sql.types._
 
@@ -95,19 +93,17 @@ class StreamingSymmetricHashJoinHelperSuite extends 
StreamTest {
   }
 
   test("conjuncts after nondeterministic") {
-    // All conjuncts after a nondeterministic conjunct shouldn't be split 
because they don't
-    // commute across it.
     val predicate =
-      (rand() > lit(0)
+      (rand(9) > lit(0)
         && leftColA > leftColB
         && rightColC > rightColD
         && leftColA === rightColC
         && lit(1) === lit(1)).expr
     val split = JoinConditionSplitPredicates(Some(predicate), left, right)
 
-    assert(split.leftSideOnly.isEmpty)
-    assert(split.rightSideOnly.isEmpty)
-    assert(split.bothSides.contains(predicate))
+    assert(split.leftSideOnly.contains((leftColA > leftColB && lit(1) === 
lit(1)).expr))
+    assert(split.rightSideOnly.contains((rightColC > rightColD && lit(1) === 
lit(1)).expr))
+    assert(split.bothSides.contains((leftColA === rightColC && rand(9) > 
lit(0)).expr))
     assert(split.full.contains(predicate))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to