This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8819eab  [SPARK-26917][SQL] Further reduce locks in CacheManager
8819eab is described below

commit 8819eaba4db9a20c60a423fe08bee82cb8595ad0
Author: Dave DeCaprio <da...@alum.mit.edu>
AuthorDate: Fri Mar 15 10:13:34 2019 +0800

    [SPARK-26917][SQL] Further reduce locks in CacheManager
    
    ## What changes were proposed in this pull request?
    
    Further load increases in our production environment have shown that even 
the read locks can cause some contention, since they contain a mechanism that 
turns a read lock into an exclusive lock if a writer has been starved out.  
This PR reduces the potential for lock contention even further than 
https://github.com/apache/spark/pull/23833.  Additionally, it uses more 
idiomatic scala than the previous implementation.
    
    cloud-fan & gatorsmile This is a relatively minor improvement to the 
previous CacheManager changes.  At this point, I think we finally are doing the 
minimum possible amount of locking.
    
    ## How was this patch tested?
    
    Has been tested on a live system where the blocking was causing major 
issues and it is working well.
    CacheManager has no explicit unit test but is used in many places 
internally as part of the SharedState.
    
    Closes #24028 from DaveDeCaprio/read-locks-master.
    
    Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu>
    Co-authored-by: David DeCaprio <da...@alum.mit.edu>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 93 +++++++---------------
 1 file changed, 28 insertions(+), 65 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 2815a28..5a11a8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -19,8 +19,7 @@ package org.apache.spark.sql.execution
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import scala.collection.immutable.IndexedSeq
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 
@@ -46,38 +45,22 @@ case class CachedData(plan: LogicalPlan, 
cachedRepresentation: InMemoryRelation)
  */
 class CacheManager extends Logging {
 
-  @transient
-  private val cachedData = new java.util.LinkedList[CachedData]
-
-  @transient
-  private val cacheLock = new ReentrantReadWriteLock
-
-  /** Acquires a read lock on the cache for the duration of `f`. */
-  private def readLock[A](f: => A): A = {
-    val lock = cacheLock.readLock()
-    lock.lock()
-    try f finally {
-      lock.unlock()
-    }
-  }
-
-  /** Acquires a write lock on the cache for the duration of `f`. */
-  private def writeLock[A](f: => A): A = {
-    val lock = cacheLock.writeLock()
-    lock.lock()
-    try f finally {
-      lock.unlock()
-    }
-  }
+  /**
+   * Maintains the list of cached plans as an immutable sequence.  Any updates 
to the list
+   * should be protected in a "this.synchronized" block which includes the 
reading of the
+   * existing value and the update of the cachedData var.
+   */
+  @transient @volatile
+  private var cachedData = IndexedSeq[CachedData]()
 
   /** Clears all cached tables. */
-  def clearCache(): Unit = writeLock {
-    
cachedData.asScala.foreach(_.cachedRepresentation.cacheBuilder.clearCache())
-    cachedData.clear()
+  def clearCache(): Unit = this.synchronized {
+    cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache())
+    cachedData = IndexedSeq[CachedData]()
   }
 
   /** Checks if the cache is empty. */
-  def isEmpty: Boolean = readLock {
+  def isEmpty: Boolean = {
     cachedData.isEmpty
   }
 
@@ -101,11 +84,11 @@ class CacheManager extends Logging {
         sparkSession.sessionState.executePlan(planToCache).executedPlan,
         tableName,
         planToCache)
-      writeLock {
+      this.synchronized {
         if (lookupCachedData(planToCache).nonEmpty) {
           logWarning("Data has already been cached.")
         } else {
-          cachedData.add(CachedData(planToCache, inMemoryRelation))
+          cachedData = CachedData(planToCache, inMemoryRelation) +: cachedData
         }
       }
     }
@@ -144,22 +127,12 @@ class CacheManager extends Logging {
       } else {
         _.sameResult(plan)
       }
-    val plansToUncache = mutable.Buffer[CachedData]()
-    readLock {
-      val it = cachedData.iterator()
-      while (it.hasNext) {
-        val cd = it.next()
-        if (shouldRemove(cd.plan)) {
-          plansToUncache += cd
-        }
-      }
-    }
-    plansToUncache.foreach { cd =>
-      writeLock {
-        cachedData.remove(cd)
-      }
-      cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
+    val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan))
+    this.synchronized {
+      cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd))
     }
+    plansToUncache.foreach { 
_.cachedRepresentation.cacheBuilder.clearCache(blocking) }
+
     // Re-compile dependent cached queries after removing the cached query.
     if (!cascade) {
       recacheByCondition(spark, cd => {
@@ -194,46 +167,36 @@ class CacheManager extends Logging {
   private def recacheByCondition(
       spark: SparkSession,
       condition: CachedData => Boolean): Unit = {
-    val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
-    readLock {
-      val it = cachedData.iterator()
-      while (it.hasNext) {
-        val cd = it.next()
-        if (condition(cd)) {
-          needToRecache += cd
-        }
-      }
+    val needToRecache = cachedData.filter(condition)
+    this.synchronized {
+      // Remove the cache entry before creating a new ones.
+      cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd))
     }
     needToRecache.map { cd =>
-      writeLock {
-        // Remove the cache entry before we create a new one, so that we can 
have a different
-        // physical plan.
-        cachedData.remove(cd)
-      }
       cd.cachedRepresentation.cacheBuilder.clearCache()
       val plan = spark.sessionState.executePlan(cd.plan).executedPlan
       val newCache = InMemoryRelation(
         cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = 
plan),
         logicalPlan = cd.plan)
       val recomputedPlan = cd.copy(cachedRepresentation = newCache)
-      writeLock {
+      this.synchronized {
         if (lookupCachedData(recomputedPlan.plan).nonEmpty) {
           logWarning("While recaching, data was already added to cache.")
         } else {
-          cachedData.add(recomputedPlan)
+          cachedData = recomputedPlan +: cachedData
         }
       }
     }
   }
 
   /** Optionally returns cached data for the given [[Dataset]] */
-  def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
+  def lookupCachedData(query: Dataset[_]): Option[CachedData] = {
     lookupCachedData(query.logicalPlan)
   }
 
   /** Optionally returns cached data for the given [[LogicalPlan]]. */
-  def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
-    cachedData.asScala.find(cd => plan.sameResult(cd.plan))
+  def lookupCachedData(plan: LogicalPlan): Option[CachedData] = {
+    cachedData.find(cd => plan.sameResult(cd.plan))
   }
 
   /** Replaces segments of the given logical plan with cached versions where 
possible. */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to