This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8819eab [SPARK-26917][SQL] Further reduce locks in CacheManager 8819eab is described below commit 8819eaba4db9a20c60a423fe08bee82cb8595ad0 Author: Dave DeCaprio <da...@alum.mit.edu> AuthorDate: Fri Mar 15 10:13:34 2019 +0800 [SPARK-26917][SQL] Further reduce locks in CacheManager ## What changes were proposed in this pull request? Further load increases in our production environment have shown that even the read locks can cause some contention, since they contain a mechanism that turns a read lock into an exclusive lock if a writer has been starved out. This PR reduces the potential for lock contention even further than https://github.com/apache/spark/pull/23833. Additionally, it uses more idiomatic scala than the previous implementation. cloud-fan & gatorsmile This is a relatively minor improvement to the previous CacheManager changes. At this point, I think we finally are doing the minimum possible amount of locking. ## How was this patch tested? Has been tested on a live system where the blocking was causing major issues and it is working well. CacheManager has no explicit unit test but is used in many places internally as part of the SharedState. Closes #24028 from DaveDeCaprio/read-locks-master. Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu> Co-authored-by: David DeCaprio <da...@alum.mit.edu> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/execution/CacheManager.scala | 93 +++++++--------------- 1 file changed, 28 insertions(+), 65 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 2815a28..5a11a8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.execution import java.util.concurrent.locks.ReentrantReadWriteLock -import scala.collection.JavaConverters._ -import scala.collection.mutable +import scala.collection.immutable.IndexedSeq import org.apache.hadoop.fs.{FileSystem, Path} @@ -46,38 +45,22 @@ case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) */ class CacheManager extends Logging { - @transient - private val cachedData = new java.util.LinkedList[CachedData] - - @transient - private val cacheLock = new ReentrantReadWriteLock - - /** Acquires a read lock on the cache for the duration of `f`. */ - private def readLock[A](f: => A): A = { - val lock = cacheLock.readLock() - lock.lock() - try f finally { - lock.unlock() - } - } - - /** Acquires a write lock on the cache for the duration of `f`. */ - private def writeLock[A](f: => A): A = { - val lock = cacheLock.writeLock() - lock.lock() - try f finally { - lock.unlock() - } - } + /** + * Maintains the list of cached plans as an immutable sequence. Any updates to the list + * should be protected in a "this.synchronized" block which includes the reading of the + * existing value and the update of the cachedData var. + */ + @transient @volatile + private var cachedData = IndexedSeq[CachedData]() /** Clears all cached tables. */ - def clearCache(): Unit = writeLock { - cachedData.asScala.foreach(_.cachedRepresentation.cacheBuilder.clearCache()) - cachedData.clear() + def clearCache(): Unit = this.synchronized { + cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache()) + cachedData = IndexedSeq[CachedData]() } /** Checks if the cache is empty. */ - def isEmpty: Boolean = readLock { + def isEmpty: Boolean = { cachedData.isEmpty } @@ -101,11 +84,11 @@ class CacheManager extends Logging { sparkSession.sessionState.executePlan(planToCache).executedPlan, tableName, planToCache) - writeLock { + this.synchronized { if (lookupCachedData(planToCache).nonEmpty) { logWarning("Data has already been cached.") } else { - cachedData.add(CachedData(planToCache, inMemoryRelation)) + cachedData = CachedData(planToCache, inMemoryRelation) +: cachedData } } } @@ -144,22 +127,12 @@ class CacheManager extends Logging { } else { _.sameResult(plan) } - val plansToUncache = mutable.Buffer[CachedData]() - readLock { - val it = cachedData.iterator() - while (it.hasNext) { - val cd = it.next() - if (shouldRemove(cd.plan)) { - plansToUncache += cd - } - } - } - plansToUncache.foreach { cd => - writeLock { - cachedData.remove(cd) - } - cd.cachedRepresentation.cacheBuilder.clearCache(blocking) + val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan)) + this.synchronized { + cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd)) } + plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) } + // Re-compile dependent cached queries after removing the cached query. if (!cascade) { recacheByCondition(spark, cd => { @@ -194,46 +167,36 @@ class CacheManager extends Logging { private def recacheByCondition( spark: SparkSession, condition: CachedData => Boolean): Unit = { - val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData] - readLock { - val it = cachedData.iterator() - while (it.hasNext) { - val cd = it.next() - if (condition(cd)) { - needToRecache += cd - } - } + val needToRecache = cachedData.filter(condition) + this.synchronized { + // Remove the cache entry before creating a new ones. + cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd)) } needToRecache.map { cd => - writeLock { - // Remove the cache entry before we create a new one, so that we can have a different - // physical plan. - cachedData.remove(cd) - } cd.cachedRepresentation.cacheBuilder.clearCache() val plan = spark.sessionState.executePlan(cd.plan).executedPlan val newCache = InMemoryRelation( cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = plan), logicalPlan = cd.plan) val recomputedPlan = cd.copy(cachedRepresentation = newCache) - writeLock { + this.synchronized { if (lookupCachedData(recomputedPlan.plan).nonEmpty) { logWarning("While recaching, data was already added to cache.") } else { - cachedData.add(recomputedPlan) + cachedData = recomputedPlan +: cachedData } } } } /** Optionally returns cached data for the given [[Dataset]] */ - def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock { + def lookupCachedData(query: Dataset[_]): Option[CachedData] = { lookupCachedData(query.logicalPlan) } /** Optionally returns cached data for the given [[LogicalPlan]]. */ - def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { - cachedData.asScala.find(cd => plan.sameResult(cd.plan)) + def lookupCachedData(plan: LogicalPlan): Option[CachedData] = { + cachedData.find(cd => plan.sameResult(cd.plan)) } /** Replaces segments of the given logical plan with cached versions where possible. */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org