Repository: spark Updated Branches: refs/heads/branch-2.3 db538b25a -> 6e1f5e018
[SPARK-24613][SQL] Cache with UDF could not be matched with subsequent dependent caches Wrap the logical plan with a `AnalysisBarrier` for execution plan compilation in CacheManager, in order to avoid the plan being analyzed again. Add one test in `DatasetCacheSuite` Author: Maryann Xue <maryann...@apache.org> Closes #21602 from maryannxue/cache-mismatch. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e1f5e01 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e1f5e01 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e1f5e01 Branch: refs/heads/branch-2.3 Commit: 6e1f5e0182e1d111f7252c24c576674d1d2c7b91 Parents: db538b2 Author: Maryann Xue <maryann...@apache.org> Authored: Thu Jun 21 11:45:30 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Wed Jun 27 13:20:57 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/execution/CacheManager.scala | 6 +++--- .../org/apache/spark/sql/DatasetCacheSuite.scala | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6e1f5e01/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index d68aeb2..dbdda27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ResolvedHint} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, LogicalPlan, ResolvedHint} import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.storage.StorageLevel @@ -97,7 +97,7 @@ class CacheManager extends Logging { val inMemoryRelation = InMemoryRelation( sparkSession.sessionState.conf.useCompression, sparkSession.sessionState.conf.columnBatchSize, storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, + sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan, tableName, planToCache.stats) cachedData.add(CachedData(planToCache, inMemoryRelation)) @@ -146,7 +146,7 @@ class CacheManager extends Logging { useCompression = cd.cachedRepresentation.useCompression, batchSize = cd.cachedRepresentation.batchSize, storageLevel = cd.cachedRepresentation.storageLevel, - child = spark.sessionState.executePlan(cd.plan).executedPlan, + child = spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan, tableName = cd.cachedRepresentation.tableName, statsOfPlanToCache = cd.plan.stats) needToRecache += cd.copy(cachedRepresentation = newCache) http://git-wip-us.apache.org/repos/asf/spark/blob/6e1f5e01/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index e0561ee..f6c760e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.storage.StorageLevel @@ -96,4 +97,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext { agged.unpersist() assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should not be cached.") } + + test("SPARK-24613 Cache with UDF could not be matched with subsequent dependent caches") { + val udf1 = udf({x: Int => x + 1}) + val df = spark.range(0, 10).toDF("a").withColumn("b", udf1($"a")) + val df2 = df.agg(sum(df("b"))) + + df.cache() + df.count() + df2.cache() + + val plan = df2.queryExecution.withCachedData + assert(plan.isInstanceOf[InMemoryRelation]) + val internalPlan = plan.asInstanceOf[InMemoryRelation].child + assert(internalPlan.find(_.isInstanceOf[InMemoryTableScanExec]).isDefined) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org