Repository: spark
Updated Branches:
  refs/heads/master 16df133d7 -> 6c5fd977f


[SPARK-15791] Fix NPE in ScalarSubquery

## What changes were proposed in this pull request?

The fix is pretty simple, just don't make the executedPlan transient in 
`ScalarSubquery` since it is referenced at execution time.

## How was this patch tested?

I verified the fix manually in non-local mode. It's not clear to me why the 
problem did not manifest in local mode, any suggestions?

cc davies

Author: Eric Liang <e...@databricks.com>

Closes #13569 from ericl/fix-scalar-npe.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c5fd977
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c5fd977
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c5fd977

Branch: refs/heads/master
Commit: 6c5fd977fbcb821a57cb4a13bc3d413a695fbc32
Parents: 16df133
Author: Eric Liang <e...@databricks.com>
Authored: Thu Jun 9 22:28:31 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Jun 9 22:28:31 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/execution/subquery.scala   |  2 +-
 .../src/test/scala/org/apache/spark/sql/QueryTest.scala   | 10 ++++++++--
 .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala   |  3 ++-
 .../test/scala/org/apache/spark/sql/SubquerySuite.scala   |  4 ++++
 4 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c5fd977/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 4a1f12d..461d301 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.DataType
  * This is the physical copy of ScalarSubquery to be used inside SparkPlan.
  */
 case class ScalarSubquery(
-    @transient executedPlan: SparkPlan,
+    executedPlan: SparkPlan,
     exprId: ExprId)
   extends SubqueryExpression {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5fd977/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 9c044f4..acb59d4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -341,10 +341,16 @@ object QueryTest {
    *
    * @param df the [[DataFrame]] to be executed
    * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+   * @param checkToRDD whether to verify deserialization to an RDD. This runs 
the query twice.
    */
-  def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
+  def checkAnswer(
+      df: DataFrame,
+      expectedAnswer: Seq[Row],
+      checkToRDD: Boolean = true): Option[String] = {
     val isSorted = df.logicalPlan.collect { case s: logical.Sort => s 
}.nonEmpty
-
+    if (checkToRDD) {
+      df.rdd.count()  // Also attempt to deserialize as an RDD [SPARK-15791]
+    }
 
     val sparkAnswer = try df.collect().toSeq catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5fd977/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 8284e8d..90465b6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2118,7 +2118,8 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
       // is correct.
       def verifyCallCount(df: DataFrame, expectedResult: Row, expectedCount: 
Int): Unit = {
         countAcc.setValue(0)
-        checkAnswer(df, expectedResult)
+        QueryTest.checkAnswer(
+          df, Seq(expectedResult), checkToRDD = false /* avoid duplicate exec 
*/)
         assert(countAcc.value == expectedCount)
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5fd977/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index a932125..05491a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -54,6 +54,10 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
     t.createOrReplaceTempView("t")
   }
 
+  test("rdd deserialization does not crash [SPARK-15791]") {
+    sql("select (select 1 as b) as b").rdd.count()
+  }
+
   test("simple uncorrelated scalar subquery") {
     checkAnswer(
       sql("select (select 1 as b) as b"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to