This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 295dd57c13c [SPARK-40235][CORE] Use interruptible lock instead of 
synchronized in Executor.updateDependencies()
295dd57c13c is described below

commit 295dd57c13caaa9f9e78cd46dfda4e17ced7c449
Author: Josh Rosen <joshro...@databricks.com>
AuthorDate: Mon Aug 29 16:47:38 2022 -0700

    [SPARK-40235][CORE] Use interruptible lock instead of synchronized in 
Executor.updateDependencies()
    
    ### What changes were proposed in this pull request?
    
    This patch modifies the synchronization in `Executor.updateDependencies()` 
in order to allow tasks to be interrupted while they are blocked and waiting on 
other tasks to finish downloading dependencies.
    
    This synchronization was added years ago in 
https://github.com/mesos/spark/commit/7b9e96c99206c0679d9925e0161fde738a5c7c3a 
in order to prevent concurrently-launching tasks from performing concurrent 
dependency updates. If one task is downloading dependencies, all other 
newly-launched tasks will block until the original dependency download is 
complete.
    
    Let's say that a Spark task launches, becomes blocked on a 
`updateDependencies()` call, then is cancelled while it is blocked. Although 
Spark will send a `Thread.interrupt()` to the canceled task, the task will 
continue waiting because threads blocked on a `synchronized` won't throw an 
InterruptedException in response to the interrupt. As a result, the blocked 
thread will continue to wait until the other thread exits the synchronized 
block. 
    
    This PR aims to fix this problem by replacing the `synchronized` with a 
`ReentrantLock`, which has a `lockInterruptibly` method.
    
    ### Why are the changes needed?
    
    In a real-world scenario, we hit a case where a task was canceled right 
after being launched while another task was blocked in a slow library download. 
The slow library download took so long that the TaskReaper killed the executor 
because the canceled task could not exit in a timely fashion. This patch's fix 
prevents this issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test case.
    
    Closes #37681 from JoshRosen/SPARK-40235-update-dependencies-lock.
    
    Authored-by: Josh Rosen <joshro...@databricks.com>
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
---
 .../scala/org/apache/spark/executor/Executor.scala | 22 +++++++--
 .../org/apache/spark/executor/ExecutorSuite.scala  | 53 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index ab2bd1b7801..db507bd176b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer
 import java.util.{Locale, Properties}
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantLock
 import javax.annotation.concurrent.GuardedBy
 import javax.ws.rs.core.UriBuilder
 
@@ -85,6 +86,11 @@ private[spark] class Executor(
 
   private[executor] val conf = env.conf
 
+  // SPARK-40235: updateDependencies() uses a ReentrantLock instead of the 
`synchronized` keyword
+  // so that tasks can exit quickly if they are interrupted while waiting on 
another task to
+  // finish downloading dependencies.
+  private val updateDependenciesLock = new ReentrantLock()
+
   // No ip or host:port - just hostname
   Utils.checkHost(executorHostname)
   // must not have port specified.
@@ -978,13 +984,19 @@ private[spark] class Executor(
   /**
    * Download any missing dependencies if we receive a new set of files and 
JARs from the
    * SparkContext. Also adds any new JARs we fetched to the class loader.
+   * Visible for testing.
    */
-  private def updateDependencies(
+  private[executor] def updateDependencies(
       newFiles: Map[String, Long],
       newJars: Map[String, Long],
-      newArchives: Map[String, Long]): Unit = {
+      newArchives: Map[String, Long],
+      testStartLatch: Option[CountDownLatch] = None,
+      testEndLatch: Option[CountDownLatch] = None): Unit = {
     lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
-    synchronized {
+    updateDependenciesLock.lockInterruptibly()
+    try {
+      // For testing, so we can simulate a slow file download:
+      testStartLatch.foreach(_.countDown())
       // Fetch missing dependencies
       for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) 
< timestamp) {
         logInfo(s"Fetching $name with timestamp $timestamp")
@@ -1027,6 +1039,10 @@ private[spark] class Executor(
           }
         }
       }
+      // For testing, so we can simulate a slow file download:
+      testEndLatch.foreach(_.await())
+    } finally {
+      updateDependenciesLock.unlock()
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 14871efac5b..bef36d08e8a 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -514,6 +514,59 @@ class ExecutorSuite extends SparkFunSuite
     }
   }
 
+  test("SPARK-40235: updateDependencies is interruptible when waiting on 
lock") {
+    val conf = new SparkConf
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+    withExecutor("id", "localhost", env) { executor =>
+      val startLatch = new CountDownLatch(1)
+      val endLatch = new CountDownLatch(1)
+
+      // Start a thread to simulate a task that begins executing 
updateDependencies()
+      // and takes a long time to finish because file download is slow:
+      val slowLibraryDownloadThread = new Thread(() => {
+        executor.updateDependencies(
+          Map.empty,
+          Map.empty,
+          Map.empty,
+          Some(startLatch),
+          Some(endLatch))
+      })
+      slowLibraryDownloadThread.start()
+
+      // Wait for that thread to acquire the lock:
+      startLatch.await()
+
+      // Start a second thread to simulate a task that blocks on the other 
task's
+      // dependency update:
+      val blockedLibraryDownloadThread = new Thread(() => {
+        executor.updateDependencies(
+          Map.empty,
+          Map.empty,
+          Map.empty)
+      })
+      blockedLibraryDownloadThread.start()
+      eventually(timeout(10.seconds), interval(100.millis)) {
+        val threadState = blockedLibraryDownloadThread.getState
+        assert(Set(Thread.State.BLOCKED, 
Thread.State.WAITING).contains(threadState))
+      }
+
+      // Interrupt the blocked thread:
+      blockedLibraryDownloadThread.interrupt()
+
+      // The thread should exit:
+      eventually(timeout(10.seconds), interval(100.millis)) {
+        assert(blockedLibraryDownloadThread.getState == 
Thread.State.TERMINATED)
+      }
+
+      // Allow the first thread to finish and exit:
+      endLatch.countDown()
+      eventually(timeout(10.seconds), interval(100.millis)) {
+        assert(slowLibraryDownloadThread.getState == Thread.State.TERMINATED)
+      }
+    }
+  }
+
   private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): 
SparkEnv = {
     val mockEnv = mock[SparkEnv]
     val mockRpcEnv = mock[RpcEnv]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to