This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d8d2736 [SPARK-26708][SQL][FOLLOWUP] put the special handling of non-cascade uncache in the uncache method d8d2736 is described below commit d8d2736fd1e3cd47941153327ad50a4d36099476 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu Jan 31 11:04:33 2019 +0800 [SPARK-26708][SQL][FOLLOWUP] put the special handling of non-cascade uncache in the uncache method ## What changes were proposed in this pull request? This is a follow up of https://github.com/apache/spark/pull/23644/files , to make these methods less coupled with each other. ## How was this patch tested? existing tests Closes #23687 from cloud-fan/cache. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/execution/CacheManager.scala | 48 +++++++++++----------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 00c4461..398d7b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -160,7 +160,22 @@ class CacheManager extends Logging { } // Re-compile dependent cached queries after removing the cached query. if (!cascade) { - recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, clearCache = false) + recacheByCondition(spark, cd => { + // If the cache buffer has already been loaded, we don't need to recompile the cached plan, + // as it does not rely on the plan that has been uncached anymore, it will just produce + // data from the cache buffer. + // Note that the `CachedRDDBuilder.isCachedColumnBuffersLoaded` call is a non-locking + // status test and may not return the most accurate cache buffer state. So the worse case + // scenario can be: + // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we + // will clear the buffer and re-compiled the plan. It is inefficient but doesn't affect + // correctness. + // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we + // will keep it as it is. It means the physical plan has been re-compiled already in the + // other thread. + val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded + cd.plan.find(_.sameResult(plan)).isDefined && !cacheAlreadyLoaded + }) } } @@ -168,38 +183,21 @@ class CacheManager extends Logging { * Tries to re-cache all the cache entries that refer to the given plan. */ def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = { - recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined) + recacheByCondition(spark, _.plan.find(_.sameResult(plan)).isDefined) } + /** + * Re-caches all the cache entries that satisfies the given `condition`. + */ private def recacheByCondition( spark: SparkSession, - condition: LogicalPlan => Boolean, - clearCache: Boolean = true): Unit = { + condition: CachedData => Boolean): Unit = { val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData] writeLock { val it = cachedData.iterator() while (it.hasNext) { val cd = it.next() - // If `clearCache` is false (which means the recache request comes from a non-cascading - // cache invalidation) and the cache buffer has already been loaded, we do not need to - // re-compile a physical plan because the old plan will not be used any more by the - // CacheManager although it still lives in compiled `Dataset`s and it could still work. - // Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer - // and re-compile the physical plan; or it is a non-cascading cache invalidation and cache - // buffer is still empty, then we could have a more efficient new plan by removing - // dependency on the previously removed cache entries. - // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking - // status test and may not return the most accurate cache buffer state. So the worse case - // scenario can be: - // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we - // will clear the buffer and build a new plan. It is inefficient but doesn't affect - // correctness. - // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we - // will keep it as it is. It means the physical plan has been re-compiled already in the - // other thread. - val buildNewPlan = - clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded - if (condition(cd.plan) && buildNewPlan) { + if (condition(cd)) { needToRecache += cd // Remove the cache entry before we create a new one, so that we can have a different // physical plan. @@ -267,7 +265,7 @@ class CacheManager extends Logging { (fs, fs.makeQualified(path)) } - recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined) + recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined) } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org