This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 195f1aa  [SPARK-37779][SQL] Make ColumnarToRowExec plan 
canonicalizable after (de)serialization
195f1aa is described below

commit 195f1aaf4361fb8f5f31ef7f5c63464767ad88bd
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Thu Dec 30 12:38:37 2021 +0900

    [SPARK-37779][SQL] Make ColumnarToRowExec plan canonicalizable after 
(de)serialization
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add a driver-side check on `supportsColumnar` sanity 
check at `ColumnarToRowExec`.
    
    ### Why are the changes needed?
    
    SPARK-23731 fixed the plans to be serializable by leveraging lazy but 
SPARK-28213 happened to refer to the lazy variable at: 
https://github.com/apache/spark/blob/77b164aac9764049a4820064421ef82ec0bc14fb/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L68
    
    This can fail during canonicalization during, for example, eliminating sub 
common expressions (on executor side):
    
    ```
    java.lang.NullPointerException
        at 
org.apache.spark.sql.execution.FileSourceScanExec.supportsColumnar$lzycompute(DataSourceScanExec.scala:280)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.supportsColumnar(DataSourceScanExec.scala:279)
        at 
org.apache.spark.sql.execution.InputAdapter.supportsColumnar(WholeStageCodegenExec.scala:509)
        at 
org.apache.spark.sql.execution.ColumnarToRowExec.<init>(Columnar.scala:67)
        ...
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:581)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:580)
        at 
org.apache.spark.sql.execution.ScalarSubquery.canonicalized$lzycompute(subquery.scala:110)
        ...
        at 
org.apache.spark.sql.catalyst.expressions.ExpressionEquals.hashCode(EquivalentExpressions.scala:275)
        ...
        at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135)
        at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44)
        at scala.collection.mutable.HashMap.get(HashMap.scala:74)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:46)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTreeHelper$1(EquivalentExpressions.scala:147)
        at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:170)
        at 
org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$1(SubExprEvaluationRuntime.scala:89)
        at 
org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$1$adapted(SubExprEvaluationRuntime.scala:89)
        at scala.collection.immutable.List.foreach(List.scala:392)
    ```
    
    This fix is still a bandaid fix but at least addresses the issue with 
minimized change - this fix should ideally be backported too.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Pretty unlikely - when `ColumnarToRowExec` has to be canonicalized on the 
executor side (see the stacktrace), but yes. it would fix a bug.
    
    ### How was this patch tested?
    
    Unittest was added.
    
    Closes #35058 from HyukjinKwon/SPARK-37779.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../org/apache/spark/sql/execution/Columnar.scala     |  3 ++-
 .../apache/spark/sql/execution/SparkPlanSuite.scala   | 19 +++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 8b340b2..628d4a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -65,7 +65,8 @@ trait ColumnarToRowTransition extends UnaryExecNode
  * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those 
implementations.
  */
 case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition 
with CodegenSupport {
-  assert(child.supportsColumnar)
+  // supportsColumnar requires to be only called on driver side, see also 
SPARK-37779.
+  assert(TaskContext.get != null || child.supportsColumnar)
 
   override def output: Seq[Attribute] = child.output
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index bc4dfcb..12d311d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -124,6 +124,25 @@ class SparkPlanSuite extends QueryTest with 
SharedSparkSession {
     val nonEmpty = ColumnarOp(relation).toRowBased.executeCollect()
     assert(nonEmpty === relation.executeCollect())
   }
+
+  test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being 
(de)serialized") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+      withTempPath { path =>
+        spark.range(1).write.parquet(path.getAbsolutePath)
+        val df = spark.read.parquet(path.getAbsolutePath)
+        val columnarToRowExec =
+          df.queryExecution.executedPlan.collectFirst { case p: 
ColumnarToRowExec => p }.get
+        try {
+          spark.range(1).foreach { _ =>
+            columnarToRowExec.canonicalized
+            ()
+          }
+        } catch {
+          case e: Throwable => fail("ColumnarToRowExec was not 
canonicalizable", e)
+        }
+      }
+    }
+  }
 }
 
 case class ColumnarOp(child: SparkPlan) extends UnaryExecNode {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to