Repository: spark
Updated Branches:
  refs/heads/branch-2.3 db538b25a -> 6e1f5e018


[SPARK-24613][SQL] Cache with UDF could not be matched with subsequent 
dependent caches

Wrap the logical plan with a `AnalysisBarrier` for execution plan compilation 
in CacheManager, in order to avoid the plan being analyzed again.

Add one test in `DatasetCacheSuite`

Author: Maryann Xue <maryann...@apache.org>

Closes #21602 from maryannxue/cache-mismatch.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e1f5e01
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e1f5e01
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e1f5e01

Branch: refs/heads/branch-2.3
Commit: 6e1f5e0182e1d111f7252c24c576674d1d2c7b91
Parents: db538b2
Author: Maryann Xue <maryann...@apache.org>
Authored: Thu Jun 21 11:45:30 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Wed Jun 27 13:20:57 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/CacheManager.scala   |  6 +++---
 .../org/apache/spark/sql/DatasetCacheSuite.scala    | 16 ++++++++++++++++
 2 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6e1f5e01/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index d68aeb2..dbdda27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ResolvedHint}
+import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, 
LogicalPlan, ResolvedHint}
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.storage.StorageLevel
@@ -97,7 +97,7 @@ class CacheManager extends Logging {
       val inMemoryRelation = InMemoryRelation(
         sparkSession.sessionState.conf.useCompression,
         sparkSession.sessionState.conf.columnBatchSize, storageLevel,
-        sparkSession.sessionState.executePlan(planToCache).executedPlan,
+        
sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan,
         tableName,
         planToCache.stats)
       cachedData.add(CachedData(planToCache, inMemoryRelation))
@@ -146,7 +146,7 @@ class CacheManager extends Logging {
           useCompression = cd.cachedRepresentation.useCompression,
           batchSize = cd.cachedRepresentation.batchSize,
           storageLevel = cd.cachedRepresentation.storageLevel,
-          child = spark.sessionState.executePlan(cd.plan).executedPlan,
+          child = 
spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan,
           tableName = cd.cachedRepresentation.tableName,
           statsOfPlanToCache = cd.plan.stats)
         needToRecache += cd.copy(cachedRepresentation = newCache)

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1f5e01/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index e0561ee..f6c760e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.execution.columnar.{InMemoryRelation, 
InMemoryTableScanExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.storage.StorageLevel
@@ -96,4 +97,19 @@ class DatasetCacheSuite extends QueryTest with 
SharedSQLContext {
     agged.unpersist()
     assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should 
not be cached.")
   }
+
+  test("SPARK-24613 Cache with UDF could not be matched with subsequent 
dependent caches") {
+    val udf1 = udf({x: Int => x + 1})
+    val df = spark.range(0, 10).toDF("a").withColumn("b", udf1($"a"))
+    val df2 = df.agg(sum(df("b")))
+
+    df.cache()
+    df.count()
+    df2.cache()
+
+    val plan = df2.queryExecution.withCachedData
+    assert(plan.isInstanceOf[InMemoryRelation])
+    val internalPlan = plan.asInstanceOf[InMemoryRelation].child
+    assert(internalPlan.find(_.isInstanceOf[InMemoryTableScanExec]).isDefined)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to