Repository: spark
Updated Branches:
  refs/heads/branch-2.4 22bec3c6d -> 5cc2987db


[SPARK-25767][SQL] Fix lazily evaluated stream of expressions in code generation

## What changes were proposed in this pull request?

Code generation is incorrect if `outputVars` parameter of `consume` method in 
`CodegenSupport` contains a lazily evaluated stream of expressions.
This PR fixes the issue by forcing the evaluation of `inputVars` before 
generating the code for UnsafeRow.

## How was this patch tested?

Tested with the sample program provided in 
https://issues.apache.org/jira/browse/SPARK-25767

Closes #22789 from peter-toth/SPARK-25767.

Authored-by: Peter Toth <peter.t...@gmail.com>
Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>
(cherry picked from commit 7fe5cff0581ca9d8221533215098f40f69362018)
Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cc2987d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cc2987d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cc2987d

Branch: refs/heads/branch-2.4
Commit: 5cc2987dbba609d99df0b367abe25238c9498cba
Parents: 22bec3c
Author: Peter Toth <peter.t...@gmail.com>
Authored: Mon Oct 29 16:47:50 2018 +0100
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Mon Oct 29 16:48:06 2018 +0100

----------------------------------------------------------------------
 .../spark/sql/execution/WholeStageCodegenExec.scala      |  5 ++++-
 .../spark/sql/execution/WholeStageCodegenSuite.scala     | 11 +++++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5cc2987d/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 1fc4de9..ded8dd3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -146,7 +146,10 @@ trait CodegenSupport extends SparkPlan {
       if (outputVars != null) {
         assert(outputVars.length == output.length)
         // outputVars will be used to generate the code for UnsafeRow, so we 
should copy them
-        outputVars.map(_.copy())
+        outputVars.map(_.copy()) match {
+          case stream: Stream[ExprCode] => stream.force
+          case other => other
+        }
       } else {
         assert(row != null, "outputVars and row cannot both be null.")
         ctx.currentVars = null

http://git-wip-us.apache.org/repos/asf/spark/blob/5cc2987d/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index b714dcd..09ad0fd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -319,4 +319,15 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSQLContext {
       assert(df.limit(1).collect() === Array(Row("bat", 8.0)))
     }
   }
+
+  test("SPARK-25767: Lazy evaluated stream of expressions handled correctly") {
+    val a = Seq(1).toDF("key")
+    val b = Seq((1, "a")).toDF("key", "value")
+    val c = Seq(1).toDF("key")
+
+    val ab = a.join(b, Stream("key"), "left")
+    val abc = ab.join(c, Seq("key"), "left")
+
+    checkAnswer(abc, Row(1, "a"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to