Repository: spark Updated Branches: refs/heads/master 52b24a602 -> d637a666d
[SPARK-10327] [SQL] Cache Table is not working while subquery has alias in its project list ```scala import org.apache.spark.sql.hive.execution.HiveTableScan sql("select key, value, key + 1 from src").registerTempTable("abc") cacheTable("abc") val sparkPlan = sql( """select a.key, b.key, c.key from |abc a join abc b on a.key=b.key |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) // failed assert(sparkPlan.collect { case e: HiveTableScan => e }.size === 0) // failed ``` The actual plan is: ``` == Parsed Logical Plan == 'Project [unresolvedalias('a.key),unresolvedalias('b.key),unresolvedalias('c.key)] 'Join Inner, Some(('a.key = 'c.key)) 'Join Inner, Some(('a.key = 'b.key)) 'UnresolvedRelation [abc], Some(a) 'UnresolvedRelation [abc], Some(b) 'UnresolvedRelation [abc], Some(c) == Analyzed Logical Plan == key: int, key: int, key: int Project [key#14,key#61,key#66] Join Inner, Some((key#14 = key#66)) Join Inner, Some((key#14 = key#61)) Subquery a Subquery abc Project [key#14,value#15,(key#14 + 1) AS _c2#16] MetastoreRelation default, src, None Subquery b Subquery abc Project [key#61,value#62,(key#61 + 1) AS _c2#58] MetastoreRelation default, src, None Subquery c Subquery abc Project [key#66,value#67,(key#66 + 1) AS _c2#63] MetastoreRelation default, src, None == Optimized Logical Plan == Project [key#14,key#61,key#66] Join Inner, Some((key#14 = key#66)) Project [key#14,key#61] Join Inner, Some((key#14 = key#61)) Project [key#14] InMemoryRelation [key#14,value#15,_c2#16], true, 10000, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc) Project [key#61] MetastoreRelation default, src, None Project [key#66] MetastoreRelation default, src, None == Physical Plan == TungstenProject [key#14,key#61,key#66] BroadcastHashJoin [key#14], [key#66], BuildRight TungstenProject [key#14,key#61] BroadcastHashJoin [key#14], [key#61], BuildRight ConvertToUnsafe InMemoryColumnarTableScan [key#14], (InMemoryRelation [key#14,value#15,_c2#16], true, 10000, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc)) ConvertToUnsafe HiveTableScan [key#61], (MetastoreRelation default, src, None) ConvertToUnsafe HiveTableScan [key#66], (MetastoreRelation default, src, None) ``` Author: Cheng Hao <hao.ch...@intel.com> Closes #8494 from chenghao-intel/weird_cache. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d637a666 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d637a666 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d637a666 Branch: refs/heads/master Commit: d637a666d5932002c8ce0bd23c06064fbfdc1c97 Parents: 52b24a6 Author: Cheng Hao <hao.ch...@intel.com> Authored: Tue Sep 8 16:16:50 2015 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Tue Sep 8 16:16:50 2015 -0700 ---------------------------------------------------------------------- .../sql/catalyst/plans/logical/LogicalPlan.scala | 15 ++++++++++++--- .../org/apache/spark/sql/CachedTableSuite.scala | 16 ++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d637a666/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 9bb466a..8f8747e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -135,16 +135,25 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { /** Args that have cleaned such that differences in expression id should not affect equality */ protected lazy val cleanArgs: Seq[Any] = { val input = children.flatMap(_.output) + def cleanExpression(e: Expression) = e match { + case a: Alias => + // As the root of the expression, Alias will always take an arbitrary exprId, we need + // to erase that for equality testing. + val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers) + BindReferences.bindReference(cleanedExprId, input, allowFailures = true) + case other => BindReferences.bindReference(other, input, allowFailures = true) + } + productIterator.map { // Children are checked using sameResult above. case tn: TreeNode[_] if containsChild(tn) => null - case e: Expression => BindReferences.bindReference(e, input, allowFailures = true) + case e: Expression => cleanExpression(e) case s: Option[_] => s.map { - case e: Expression => BindReferences.bindReference(e, input, allowFailures = true) + case e: Expression => cleanExpression(e) case other => other } case s: Seq[_] => s.map { - case e: Expression => BindReferences.bindReference(e, input, allowFailures = true) + case e: Expression => cleanExpression(e) case other => other } case other => other http://git-wip-us.apache.org/repos/asf/spark/blob/d637a666/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 3a3541a..84e66b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.execution.PhysicalRDD + import scala.concurrent.duration._ import scala.language.postfixOps @@ -338,4 +340,18 @@ class CachedTableSuite extends QueryTest with SharedSQLContext { assert((accsSize - 2) == Accumulators.originals.size) } } + + test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") { + ctx.sparkContext.parallelize((1, 1) :: (2, 2) :: Nil) + .toDF("key", "value").selectExpr("key", "value", "key+1").registerTempTable("abc") + ctx.cacheTable("abc") + + val sparkPlan = sql( + """select a.key, b.key, c.key from + |abc a join abc b on a.key=b.key + |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan + + assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) + assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org