This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7043aee  [SPARK-27112][CORE] : Create a resource ordering between 
threads to resolve the deadlocks encountered …
7043aee is described below

commit 7043aee1ba95e92e1cbd0ebafcc5b09b69ee3082
Author: pgandhi <pgan...@verizonmedia.com>
AuthorDate: Mon Mar 18 10:33:51 2019 -0500

    [SPARK-27112][CORE] : Create a resource ordering between threads to resolve 
the deadlocks encountered …
    
    …when trying to kill executors either due to dynamic allocation or 
blacklisting
    
    ## What changes were proposed in this pull request?
    
    There are two deadlocks as a result of the interplay between three 
different threads:
    
    **task-result-getter thread**
    
    **spark-dynamic-executor-allocation thread**
    
    **dispatcher-event-loop thread(makeOffers())**
    
    The fix ensures ordering synchronization constraint by acquiring lock on 
`TaskSchedulerImpl` before acquiring lock on `CoarseGrainedSchedulerBackend` in 
`makeOffers()` as well as killExecutors() method. This ensures resource 
ordering between the threads and thus, fixes the deadlocks.
    
    ## How was this patch tested?
    
    Manual Tests
    
    Closes #24072 from pgandhi999/SPARK-27112-2.
    
    Authored-by: pgandhi <pgan...@verizonmedia.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala   | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index dc0f21c..808ef08 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -258,7 +258,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     // Make fake resource offers on all executors
     private def makeOffers() {
       // Make sure no executor is killed while some task is launching on it
-      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
+      val taskDescs = withLock {
         // Filter out executors under killing
         val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
         val workOffers = activeExecutors.map {
@@ -284,7 +284,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     // Make fake resource offers on just one executor
     private def makeOffers(executorId: String) {
       // Make sure no executor is killed while some task is launching on it
-      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
+      val taskDescs = withLock {
         // Filter out executors under killing
         if (executorIsAlive(executorId)) {
           val executorData = executorDataMap(executorId)
@@ -631,7 +631,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       force: Boolean): Seq[String] = {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
 
-    val response = synchronized {
+    val response = withLock {
       val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
       unknownExecutors.foreach { id =>
         logWarning(s"Executor to kill $id does not exist!")
@@ -730,6 +730,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()
 
+  // SPARK-27112: We need to ensure that there is ordering of lock acquisition
+  // between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in 
order to fix
+  // the deadlock issue exposed in SPARK-27112
+  private def withLock[T](fn: => T): T = scheduler.synchronized {
+    CoarseGrainedSchedulerBackend.this.synchronized { fn }
+  }
+
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to