Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21594#discussion_r197600101 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -105,24 +105,58 @@ class CacheManager extends Logging { } /** - * Un-cache all the cache entries that refer to the given plan. + * Un-cache the given plan or all the cache entries that refer to the given plan. + * @param query The [[Dataset]] to be un-cached. + * @param cascade If true, un-cache all the cache entries that refer to the given + * [[Dataset]]; otherwise un-cache the given [[Dataset]] only. + * @param blocking Whether to block until all blocks are deleted. */ - def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock { - uncacheQuery(query.sparkSession, query.logicalPlan, blocking) + def uncacheQuery(query: Dataset[_], + cascade: Boolean, blocking: Boolean = true): Unit = writeLock { + uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking) } /** - * Un-cache all the cache entries that refer to the given plan. + * Un-cache the given plan or all the cache entries that refer to the given plan. + * @param spark The Spark session. + * @param plan The plan to be un-cached. + * @param cascade If true, un-cache all the cache entries that refer to the given + * plan; otherwise un-cache the given plan only. + * @param blocking Whether to block until all blocks are deleted. */ - def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: Boolean): Unit = writeLock { + def uncacheQuery(spark: SparkSession, plan: LogicalPlan, + cascade: Boolean, blocking: Boolean): Unit = writeLock { + val shouldRemove: LogicalPlan => Boolean = + if (cascade) { + _.find(_.sameResult(plan)).isDefined + } else { + _.sameResult(plan) + } val it = cachedData.iterator() while (it.hasNext) { val cd = it.next() - if (cd.plan.find(_.sameResult(plan)).isDefined) { + if (shouldRemove(cd.plan)) { cd.cachedRepresentation.cacheBuilder.clearCache(blocking) it.remove() } } + // Re-compile dependent cached queries after removing the cached query. + if (!cascade) { + val it = cachedData.iterator() + val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData] + while (it.hasNext) { + val cd = it.next() + if (cd.plan.find(_.sameResult(plan)).isDefined) { + it.remove() + val plan = spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan + val newCache = InMemoryRelation( + cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan), + logicalPlan = cd.plan) + needToRecache += cd.copy(cachedRepresentation = newCache) + } + } + needToRecache.foreach(cachedData.add) --- End diff -- It's almost the same logic as "recache", except that it tries to reuse the cached buffer here. It would be nice to integrate these two, but it would look so clean given the inconvenience of copying a CacheBuilder. I'll try though.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org