This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 673654b  [SPARK-35714][CORE] Bug fix for deadlock during the executor 
shutdown
673654b is described below

commit 673654b27a5fca800974b43495c6c0d0db9b1479
Author: Kun Wan <wan...@apache.org>
AuthorDate: Sun Jun 13 16:01:00 2021 -0500

    [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
    
    ### What changes were proposed in this pull request?
    
    Bug fix for deadlock during the executor shutdown
    
    ### Why are the changes needed?
    
    When a executor received a TERM signal, it (the second TERM signal) will 
lock java.lang.Shutdown class and then call Shutdown.exit() method to exit the 
JVM.
    Shutdown will call SparkShutdownHook to shutdown the executor.
    During the executor shutdown phase, RemoteProcessDisconnected event will be 
send to the RPC inbox, and then WorkerWatcher will try to call System.exit(-1) 
again.
    Because java.lang.Shutdown has already locked, a deadlock has occurred.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Test case "task reaper kills JVM if killed tasks keep running for too long" 
in JobCancellationSuite
    
    Closes #32868 from wankunde/SPARK-35714.
    
    Authored-by: Kun Wan <wan...@apache.org>
    Signed-off-by: Sean Owen <sro...@gmail.com>
    (cherry picked from commit 69aa7ad11f68e96e045b5eb915e21708e018421a)
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../org/apache/spark/deploy/worker/WorkerWatcher.scala     | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 23efcab..43ec492 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -17,8 +17,13 @@
 
 package org.apache.spark.deploy.worker
 
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
+import org.apache.spark.util.ThreadUtils
 
 /**
  * Endpoint which connects to a worker process and terminates the JVM if the
@@ -45,7 +50,14 @@ private[spark] class WorkerWatcher(
   private val expectedAddress = RpcAddress.fromURIString(workerUrl)
   private def isWorker(address: RpcAddress) = expectedAddress == address
 
-  private def exitNonZero() = if (isTesting) isShutDown = true else 
System.exit(-1)
+  private def exitNonZero() =
+    if (isTesting) {
+      isShutDown = true
+    } else {
+      ThreadUtils.awaitResult(Future {
+        System.exit(-1)
+      }, 5.seconds)
+    }
 
   override def receive: PartialFunction[Any, Unit] = {
     case e => logWarning(s"Received unexpected message: $e")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to