Repository: spark Updated Branches: refs/heads/branch-2.4 22bec3c6d -> 5cc2987db
[SPARK-25767][SQL] Fix lazily evaluated stream of expressions in code generation ## What changes were proposed in this pull request? Code generation is incorrect if `outputVars` parameter of `consume` method in `CodegenSupport` contains a lazily evaluated stream of expressions. This PR fixes the issue by forcing the evaluation of `inputVars` before generating the code for UnsafeRow. ## How was this patch tested? Tested with the sample program provided in https://issues.apache.org/jira/browse/SPARK-25767 Closes #22789 from peter-toth/SPARK-25767. Authored-by: Peter Toth <peter.t...@gmail.com> Signed-off-by: Herman van Hovell <hvanhov...@databricks.com> (cherry picked from commit 7fe5cff0581ca9d8221533215098f40f69362018) Signed-off-by: Herman van Hovell <hvanhov...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cc2987d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cc2987d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cc2987d Branch: refs/heads/branch-2.4 Commit: 5cc2987dbba609d99df0b367abe25238c9498cba Parents: 22bec3c Author: Peter Toth <peter.t...@gmail.com> Authored: Mon Oct 29 16:47:50 2018 +0100 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Mon Oct 29 16:48:06 2018 +0100 ---------------------------------------------------------------------- .../spark/sql/execution/WholeStageCodegenExec.scala | 5 ++++- .../spark/sql/execution/WholeStageCodegenSuite.scala | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5cc2987d/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 1fc4de9..ded8dd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -146,7 +146,10 @@ trait CodegenSupport extends SparkPlan { if (outputVars != null) { assert(outputVars.length == output.length) // outputVars will be used to generate the code for UnsafeRow, so we should copy them - outputVars.map(_.copy()) + outputVars.map(_.copy()) match { + case stream: Stream[ExprCode] => stream.force + case other => other + } } else { assert(row != null, "outputVars and row cannot both be null.") ctx.currentVars = null http://git-wip-us.apache.org/repos/asf/spark/blob/5cc2987d/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index b714dcd..09ad0fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -319,4 +319,15 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(df.limit(1).collect() === Array(Row("bat", 8.0))) } } + + test("SPARK-25767: Lazy evaluated stream of expressions handled correctly") { + val a = Seq(1).toDF("key") + val b = Seq((1, "a")).toDF("key", "value") + val c = Seq(1).toDF("key") + + val ab = a.join(b, Stream("key"), "left") + val abc = ab.join(c, Seq("key"), "left") + + checkAnswer(abc, Row(1, "a")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org