This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d8d2736  [SPARK-26708][SQL][FOLLOWUP] put the special handling of 
non-cascade uncache in the uncache method
d8d2736 is described below

commit d8d2736fd1e3cd47941153327ad50a4d36099476
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Thu Jan 31 11:04:33 2019 +0800

    [SPARK-26708][SQL][FOLLOWUP] put the special handling of non-cascade 
uncache in the uncache method
    
    ## What changes were proposed in this pull request?
    
    This is a follow up of https://github.com/apache/spark/pull/23644/files , 
to make these methods less coupled with each other.
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #23687 from cloud-fan/cache.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 48 +++++++++++-----------
 1 file changed, 23 insertions(+), 25 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 00c4461..398d7b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -160,7 +160,22 @@ class CacheManager extends Logging {
     }
     // Re-compile dependent cached queries after removing the cached query.
     if (!cascade) {
-      recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, 
clearCache = false)
+      recacheByCondition(spark, cd => {
+        // If the cache buffer has already been loaded, we don't need to 
recompile the cached plan,
+        // as it does not rely on the plan that has been uncached anymore, it 
will just produce
+        // data from the cache buffer.
+        // Note that the `CachedRDDBuilder.isCachedColumnBuffersLoaded` call 
is a non-locking
+        // status test and may not return the most accurate cache buffer 
state. So the worse case
+        // scenario can be:
+        // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` 
returns false, then we
+        //    will clear the buffer and re-compiled the plan. It is 
inefficient but doesn't affect
+        //    correctness.
+        // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` 
returns true, then we
+        //    will keep it as it is. It means the physical plan has been 
re-compiled already in the
+        //    other thread.
+        val cacheAlreadyLoaded = 
cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
+        cd.plan.find(_.sameResult(plan)).isDefined && !cacheAlreadyLoaded
+      })
     }
   }
 
@@ -168,38 +183,21 @@ class CacheManager extends Logging {
    * Tries to re-cache all the cache entries that refer to the given plan.
    */
   def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
-    recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined)
+    recacheByCondition(spark, _.plan.find(_.sameResult(plan)).isDefined)
   }
 
+  /**
+   *  Re-caches all the cache entries that satisfies the given `condition`.
+   */
   private def recacheByCondition(
       spark: SparkSession,
-      condition: LogicalPlan => Boolean,
-      clearCache: Boolean = true): Unit = {
+      condition: CachedData => Boolean): Unit = {
     val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
     writeLock {
       val it = cachedData.iterator()
       while (it.hasNext) {
         val cd = it.next()
-        // If `clearCache` is false (which means the recache request comes 
from a non-cascading
-        // cache invalidation) and the cache buffer has already been loaded, 
we do not need to
-        // re-compile a physical plan because the old plan will not be used 
any more by the
-        // CacheManager although it still lives in compiled `Dataset`s and it 
could still work.
-        // Otherwise, it means either `clearCache` is true, then we have to 
clear the cache buffer
-        // and re-compile the physical plan; or it is a non-cascading cache 
invalidation and cache
-        // buffer is still empty, then we could have a more efficient new plan 
by removing
-        // dependency on the previously removed cache entries.
-        // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call 
is a non-locking
-        // status test and may not return the most accurate cache buffer 
state. So the worse case
-        // scenario can be:
-        // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` 
returns false, then we
-        //    will clear the buffer and build a new plan. It is inefficient 
but doesn't affect
-        //    correctness.
-        // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` 
returns true, then we
-        //    will keep it as it is. It means the physical plan has been 
re-compiled already in the
-        //    other thread.
-        val buildNewPlan =
-          clearCache || 
!cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
-        if (condition(cd.plan) && buildNewPlan) {
+        if (condition(cd)) {
           needToRecache += cd
           // Remove the cache entry before we create a new one, so that we can 
have a different
           // physical plan.
@@ -267,7 +265,7 @@ class CacheManager extends Logging {
       (fs, fs.makeQualified(path))
     }
 
-    recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined)
+    recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to