Repository: spark Updated Branches: refs/heads/branch-1.3 b318858b9 -> bc7518956
[SPARK-6550][SQL] Use analyzed plan in DataFrame This is based on bug and test case proposed by viirya. See #5203 for a excellent description of the problem. TLDR; The problem occurs because the function `groupBy(String)` calls `resolve`, which returns an `AttributeReference`. However, this `AttributeReference` is based on an analyzed plan which is thrown away. At execution time, we once again analyze the plan. However, in the case of self-joins, each call to analyze will produce a new tree for the left side of the join, rendering the previously returned `AttributeReference` invalid. As a fix, I propose we keep the analyzed plan instead of the unresolved plan inside of a `DataFrame`. Author: Michael Armbrust <mich...@databricks.com> Closes #5217 from marmbrus/preanalyzer and squashes the following commits: 1f98e2d [Michael Armbrust] revert change dd4dec1 [Michael Armbrust] Use the analyzed plan in DataFrame 089c52e [Michael Armbrust] WIP (cherry picked from commit 5d9c37c23d1edd91e6c5561780006b762cde5f66) Signed-off-by: Michael Armbrust <mich...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc751895 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc751895 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc751895 Branch: refs/heads/branch-1.3 Commit: bc75189562b138b0d600370157916bc17d03891a Parents: b318858 Author: Michael Armbrust <mich...@databricks.com> Authored: Fri Mar 27 11:40:00 2015 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Fri Mar 27 11:40:09 2015 -0700 ---------------------------------------------------------------------- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bc751895/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 4c80359..423ef39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -146,7 +146,7 @@ class DataFrame private[sql]( _: WriteToFile => LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) case _ => - queryExecution.logical + queryExecution.analyzed } /** http://git-wip-us.apache.org/repos/asf/spark/blob/bc751895/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index fbc4065..5f03805 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -113,6 +113,10 @@ class DataFrameSuite extends QueryTest { checkAnswer( df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count(), Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil) + + checkAnswer( + df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").count(), + Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil) } test("explode") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org