GitHub user ioana-delaney opened a pull request:

    https://github.com/apache/spark/pull/13570

    [SPARK-15832][SQL] Embedded IN/EXISTS predicate subquery throws 
TreeNodeException

    ## What changes were proposed in this pull request?
    Queries with embedded existential sub-query predicates throws exception 
when building the physical plan.
    
    Example failing query:
    ```SQL
    scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
    scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
    scala> sql("select c1 from t1 where (case when c2 in (select c2 from t2) 
then 2 else 3 end) IN (select c2 from t1)").show()
    
    Binding attribute, tree: c2#239
    org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: c2#239
      at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
      at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
    
      ...
      at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
      at 
org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66)
      at 
org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66)
      at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.immutable.List.map(List.scala:285)
      at 
org.apache.spark.sql.execution.joins.HashJoin$class.org$apache$spark$sql$execution$joins$HashJoin$$x$8(HashJoin.scala:66)
      at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8$lzycompute(BroadcastHashJoinExec.scala:38)
      at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8(BroadcastHashJoinExec.scala:38)
      at 
org.apache.spark.sql.execution.joins.HashJoin$class.buildKeys(HashJoin.scala:63)
      at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys$lzycompute(BroadcastHashJoinExec.scala:38)
      at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys(BroadcastHashJoinExec.scala:38)
      at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.requiredChildDistribution(BroadcastHashJoinExec.scala:52)
    ```
    
    **Problem description:**
    When the left hand side expression of an existential sub-query predicate 
contains another embedded sub-query predicate, the RewritePredicateSubquery 
optimizer rule does not resolve the embedded sub-query expressions into 
existential joins.For example, the above query has the following optimized 
plan, which fails during physical plan build.
    
    ```SQL
    == Optimized Logical Plan ==
    Project [_1#224 AS c1#227]
    +- Join LeftSemi, (CASE WHEN predicate-subquery#255 [(_2#225 = c2#239)] 
THEN 2 ELSE 3 END = c2#228#262)
       :  +- SubqueryAlias predicate-subquery#255 [(_2#225 = c2#239)]
       :     +- LocalRelation [c2#239]
       :- LocalRelation [_1#224, _2#225]
       +- LocalRelation [c2#228#262]
    
    == Physical Plan ==
    org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: c2#239
    ```
    
    **Solution:**
    In RewritePredicateSubquery, before rewriting the outermost predicate 
sub-query, resolve any embedded existential sub-queries. The Optimized plan for 
the above query after the changes looks like below.
    
    ```SQL
    == Optimized Logical Plan ==
    Project [_1#224 AS c1#227]
    +- Join LeftSemi, (CASE WHEN exists#285 THEN 2 ELSE 3 END = c2#228#284)
       :- Join ExistenceJoin(exists#285), (_2#225 = c2#239)
       :  :- LocalRelation [_1#224, _2#225]
       :  +- LocalRelation [c2#239]
       +- LocalRelation [c2#228#284]
    
    == Physical Plan ==
    *Project [_1#224 AS c1#227]
    +- *BroadcastHashJoin [CASE WHEN exists#285 THEN 2 ELSE 3 END], 
[c2#228#284], LeftSemi, BuildRight
       :- *BroadcastHashJoin [_2#225], [c2#239], ExistenceJoin(exists#285), 
BuildRight
       :  :- LocalTableScan [_1#224, _2#225]
       :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, 
int, false] as bigint)))
       :     +- LocalTableScan [c2#239]
       +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
false] as bigint)))
          +- LocalTableScan [c2#228#284]
          +- LocalTableScan [c222#36], [[111],[222]]
    ```
    
    ## How was this patch tested?
    Added new test cases in SubquerySuite.scala
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ioana-delaney/spark fixEmbedSubPredV1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/13570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #13570
    
----
commit eea703aa673aab5f56d6a97ad86860422cd563a3
Author: Ioana Delaney <ioanamdela...@gmail.com>
Date:   2016-06-08T22:49:14Z

    [SPARK-15832] Embedded IN/EXISTS predicate subquery throws 
TreeNodeException.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to