GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/21815

    [SPARK-23731][SQL] Make FileSourceScanExec canonicalizable in executor side

    ## What changes were proposed in this pull request?
    
    ### What's problem?
    
    In some cases, sub scalar query could throw a NPE, which is caused in 
execution side.
    
    ```
    java.lang.NullPointerException
        at 
org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:169)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:296)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:225)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
        at 
org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
        at 
scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
        at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
        at 
scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
        at 
scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
        at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.get(HashMap.scala:70)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:56)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:97)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:98)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
        at 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
        at 
org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
        at 
org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### How does this happen?
    
    Here looks what happen now:
    
    1. Sub scalar query was made (for instance `SELECT (SELECT id FROM foo)`).
    
    2. Try to extract some common expressions (via 
`CodeGenerator.subexpressionElimination`) so that it can generates some common 
codes and can be reused.
    
    3. During this, seems it extracts some expressions that can be reused (via 
`EquivalentExpressions.addExprTree`)
    
      
https://github.com/apache/spark/blob/b2deef64f604ddd9502a31105ed47cb63470ec85/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L1102
    
    4. During this, if the hash (`EquivalentExpressions.Expr.hashCode`) 
happened to be the same at `EquivalentExpressions.addExpr` 
anyhow, `EquivalentExpressions.Expr.equals` is called to identify object in 
the same hash, which eventually calls `semanticEquals` in `ScalarSubquery`
    
      
https://github.com/apache/spark/blob/087879a77acb37b790c36f8da67355b90719c2dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L54
    
      
https://github.com/apache/spark/blob/087879a77acb37b790c36f8da67355b90719c2dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L36
    
    5. `ScalarSubquery`'s `semanticEquals` needs `SubqueryExec`'s `sameResult`
    
      
https://github.com/apache/spark/blob/77a2fc5b521788b406bb32bcc3c637c1d7406e58/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L58
    
    6. `SubqueryExec`'s `sameResult` requires a canonicalized plan which calls 
`FileSourceScanExec`'s `doCanonicalize`
    
      
https://github.com/apache/spark/blob/e008ad175256a3192fdcbd2c4793044d52f46d57/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L258
    
    7. In `FileSourceScanExec`'s `doCanonicalize`, `FileSourceScanExec`'s 
`relation` is required but seems `@transient` so it becomes `null`.
    
      
https://github.com/apache/spark/blob/e76b0124fbe463def00b1dffcfd8fd47e04772fe/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L527
    
      
https://github.com/apache/spark/blob/e76b0124fbe463def00b1dffcfd8fd47e04772fe/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L160
    
    8. NPE is thrown:
    
    \*1. driver side
    \*2., 3., 4., 5., 6., 7., 8. executor side
    
    Note that most of cases, it looks fine because we will usually call:
    
    
https://github.com/apache/spark/blob/087879a77acb37b790c36f8da67355b90719c2dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L40
    
    which make a canonicalized plan via:
    
    
https://github.com/apache/spark/blob/b045315e5d87b7ea3588436053aaa4d5a7bd103f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala#L192
    
    
https://github.com/apache/spark/blob/77a2fc5b521788b406bb32bcc3c637c1d7406e58/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L52
    
    
    ### How to reproduce?
    
    This looks what happened now. I can reproduce this by a bit of messy way:
    
    ```diff
    diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
    index 8d06804ce1e..d25fc9a7ba9 100644
    --- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
    +++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
    @@ -37,7 +37,9 @@ class EquivalentExpressions {
           case _ => false
         }
        
    -    override def hashCode: Int = e.semanticHash()
    +    override def hashCode: Int = {
    +      1
    +    }
       }
    ```
    
    ```scala
    spark.range(1).write.mode("overwrite").parquet("/tmp/foo")
    spark.read.parquet("/tmp/foo").createOrReplaceTempView("foo")
    spark.conf.set("spark.sql.codegen.wholeStage", false)
    sql("SELECT (SELECT id FROM foo) == (SELECT id FROM foo)").collect()
    ```
    
    ### How does this PR fixes?
    
    - Make all variables that access to `FileSourceScanExec`'s `relation` as 
`lazy val` so that we avoid NPE. This is a temporary fix.
    
    - Allow `makeCopy` in `SparkPlan` without Spark session too. This looks 
still able to be accessed within executor side. For instance:
    
      ```
        at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:70)
        at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:47)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:233)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:243)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
        at 
org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
        at 
scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
        at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
        at 
scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
        at 
scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
        at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.get(HashMap.scala:70)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
        at 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
        at 
org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
        at 
org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
      ```
    
    This PR takes over https://github.com/apache/spark/pull/20856.
    
    ## How was this patch tested?
    
    Manually tested and unit test was added.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark SPARK-23731

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21815.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21815
    
----
commit b5b99aa8ea5f69fae04ab8eaccf50da0974112cb
Author: hyukjinkwon <gurwls223@...>
Date:   2018-07-19T07:42:03Z

    Make FileSourceScanExec canonicalizable in executor side

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to