Repository: spark Updated Branches: refs/heads/master bc02d0112 -> c5b735581
[SPARK-15915][SQL] Logical plans should use canonicalized plan when override sameResult. ## What changes were proposed in this pull request? `DataFrame` with plan overriding `sameResult` but not using canonicalized plan to compare can't cacheTable. The example is like: ``` val localRelation = Seq(1, 2, 3).toDF() localRelation.createOrReplaceTempView("localRelation") spark.catalog.cacheTable("localRelation") assert( localRelation.queryExecution.withCachedData.collect { case i: InMemoryRelation => i }.size == 1) ``` and this will fail as: ``` ArrayBuffer() had size 0 instead of expected size 1 ``` The reason is that when do `spark.catalog.cacheTable("localRelation")`, `CacheManager` tries to cache for the plan wrapped by `SubqueryAlias` but when planning for the DataFrame `localRelation`, `CacheManager` tries to find cached table for the not-wrapped plan because the plan for DataFrame `localRelation` is not wrapped. Some plans like `LocalRelation`, `LogicalRDD`, etc. override `sameResult` method, but not use canonicalized plan to compare so the `CacheManager` can't detect the plans are the same. This pr modifies them to use canonicalized plan when override `sameResult` method. ## How was this patch tested? Added a test to check if DataFrame with plan overriding sameResult but not using canonicalized plan to compare can cacheTable. Author: Takuya UESHIN <ues...@happy-camper.st> Closes #13638 from ueshin/issues/SPARK-15915. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5b73558 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5b73558 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5b73558 Branch: refs/heads/master Commit: c5b735581922c52a1b1cc6cd8c7b5878d3cf8f20 Parents: bc02d01 Author: Takuya UESHIN <ues...@happy-camper.st> Authored: Tue Jun 14 10:52:13 2016 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Tue Jun 14 10:52:13 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/plans/logical/LocalRelation.scala | 10 ++++++---- .../org/apache/spark/sql/execution/ExistingRDD.scala | 8 +++++--- .../sql/execution/datasources/LogicalRelation.scala | 8 +++++--- .../scala/org/apache/spark/sql/CachedTableSuite.scala | 11 +++++++++++ .../org/apache/spark/sql/hive/MetastoreRelation.scala | 2 +- 5 files changed, 28 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c5b73558/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index 87b8647..9d64f35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -65,10 +65,12 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) } } - override def sameResult(plan: LogicalPlan): Boolean = plan match { - case LocalRelation(otherOutput, otherData) => - otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data - case _ => false + override def sameResult(plan: LogicalPlan): Boolean = { + plan.canonicalized match { + case LocalRelation(otherOutput, otherData) => + otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data + case _ => false + } } override lazy val statistics = http://git-wip-us.apache.org/repos/asf/spark/blob/c5b73558/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ee72a70..e2c23a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -87,9 +87,11 @@ private[sql] case class LogicalRDD( override def newInstance(): LogicalRDD.this.type = LogicalRDD(output.map(_.newInstance()), rdd)(session).asInstanceOf[this.type] - override def sameResult(plan: LogicalPlan): Boolean = plan match { - case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id - case _ => false + override def sameResult(plan: LogicalPlan): Boolean = { + plan.canonicalized match { + case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id + case _ => false + } } override protected def stringArgs: Iterator[Any] = Iterator(output) http://git-wip-us.apache.org/repos/asf/spark/blob/c5b73558/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index a418d02..39c8606 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -60,9 +60,11 @@ case class LogicalRelation( com.google.common.base.Objects.hashCode(relation, output) } - override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match { - case LogicalRelation(otherRelation, _, _) => relation == otherRelation - case _ => false + override def sameResult(otherPlan: LogicalPlan): Boolean = { + otherPlan.canonicalized match { + case LogicalRelation(otherRelation, _, _) => relation == otherRelation + case _ => false + } } // When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need http://git-wip-us.apache.org/repos/asf/spark/blob/c5b73558/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 44bafa5..3306ac4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -552,4 +552,15 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext selectStar, Seq(Row(1, "1"))) } + + test("SPARK-15915 Logical plans should use canonicalized plan when override sameResult") { + val localRelation = Seq(1, 2, 3).toDF() + localRelation.createOrReplaceTempView("localRelation") + + spark.catalog.cacheTable("localRelation") + assert( + localRelation.queryExecution.withCachedData.collect { + case i: InMemoryRelation => i + }.size == 1) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/c5b73558/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 5596a44..58bca20 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -185,7 +185,7 @@ private[hive] case class MetastoreRelation( /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { - plan match { + plan.canonicalized match { case mr: MetastoreRelation => mr.databaseName == databaseName && mr.tableName == tableName case _ => false --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org