This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 295dd57c13c [SPARK-40235][CORE] Use interruptible lock instead of synchronized in Executor.updateDependencies() 295dd57c13c is described below commit 295dd57c13caaa9f9e78cd46dfda4e17ced7c449 Author: Josh Rosen <joshro...@databricks.com> AuthorDate: Mon Aug 29 16:47:38 2022 -0700 [SPARK-40235][CORE] Use interruptible lock instead of synchronized in Executor.updateDependencies() ### What changes were proposed in this pull request? This patch modifies the synchronization in `Executor.updateDependencies()` in order to allow tasks to be interrupted while they are blocked and waiting on other tasks to finish downloading dependencies. This synchronization was added years ago in https://github.com/mesos/spark/commit/7b9e96c99206c0679d9925e0161fde738a5c7c3a in order to prevent concurrently-launching tasks from performing concurrent dependency updates. If one task is downloading dependencies, all other newly-launched tasks will block until the original dependency download is complete. Let's say that a Spark task launches, becomes blocked on a `updateDependencies()` call, then is cancelled while it is blocked. Although Spark will send a `Thread.interrupt()` to the canceled task, the task will continue waiting because threads blocked on a `synchronized` won't throw an InterruptedException in response to the interrupt. As a result, the blocked thread will continue to wait until the other thread exits the synchronized block. This PR aims to fix this problem by replacing the `synchronized` with a `ReentrantLock`, which has a `lockInterruptibly` method. ### Why are the changes needed? In a real-world scenario, we hit a case where a task was canceled right after being launched while another task was blocked in a slow library download. The slow library download took so long that the TaskReaper killed the executor because the canceled task could not exit in a timely fashion. This patch's fix prevents this issue. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test case. Closes #37681 from JoshRosen/SPARK-40235-update-dependencies-lock. Authored-by: Josh Rosen <joshro...@databricks.com> Signed-off-by: Josh Rosen <joshro...@databricks.com> --- .../scala/org/apache/spark/executor/Executor.scala | 22 +++++++-- .../org/apache/spark/executor/ExecutorSuite.scala | 53 ++++++++++++++++++++++ 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index ab2bd1b7801..db507bd176b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -25,6 +25,7 @@ import java.nio.ByteBuffer import java.util.{Locale, Properties} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock import javax.annotation.concurrent.GuardedBy import javax.ws.rs.core.UriBuilder @@ -85,6 +86,11 @@ private[spark] class Executor( private[executor] val conf = env.conf + // SPARK-40235: updateDependencies() uses a ReentrantLock instead of the `synchronized` keyword + // so that tasks can exit quickly if they are interrupted while waiting on another task to + // finish downloading dependencies. + private val updateDependenciesLock = new ReentrantLock() + // No ip or host:port - just hostname Utils.checkHost(executorHostname) // must not have port specified. @@ -978,13 +984,19 @@ private[spark] class Executor( /** * Download any missing dependencies if we receive a new set of files and JARs from the * SparkContext. Also adds any new JARs we fetched to the class loader. + * Visible for testing. */ - private def updateDependencies( + private[executor] def updateDependencies( newFiles: Map[String, Long], newJars: Map[String, Long], - newArchives: Map[String, Long]): Unit = { + newArchives: Map[String, Long], + testStartLatch: Option[CountDownLatch] = None, + testEndLatch: Option[CountDownLatch] = None): Unit = { lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - synchronized { + updateDependenciesLock.lockInterruptibly() + try { + // For testing, so we can simulate a slow file download: + testStartLatch.foreach(_.countDown()) // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo(s"Fetching $name with timestamp $timestamp") @@ -1027,6 +1039,10 @@ private[spark] class Executor( } } } + // For testing, so we can simulate a slow file download: + testEndLatch.foreach(_.await()) + } finally { + updateDependenciesLock.unlock() } } diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 14871efac5b..bef36d08e8a 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -514,6 +514,59 @@ class ExecutorSuite extends SparkFunSuite } } + test("SPARK-40235: updateDependencies is interruptible when waiting on lock") { + val conf = new SparkConf + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + withExecutor("id", "localhost", env) { executor => + val startLatch = new CountDownLatch(1) + val endLatch = new CountDownLatch(1) + + // Start a thread to simulate a task that begins executing updateDependencies() + // and takes a long time to finish because file download is slow: + val slowLibraryDownloadThread = new Thread(() => { + executor.updateDependencies( + Map.empty, + Map.empty, + Map.empty, + Some(startLatch), + Some(endLatch)) + }) + slowLibraryDownloadThread.start() + + // Wait for that thread to acquire the lock: + startLatch.await() + + // Start a second thread to simulate a task that blocks on the other task's + // dependency update: + val blockedLibraryDownloadThread = new Thread(() => { + executor.updateDependencies( + Map.empty, + Map.empty, + Map.empty) + }) + blockedLibraryDownloadThread.start() + eventually(timeout(10.seconds), interval(100.millis)) { + val threadState = blockedLibraryDownloadThread.getState + assert(Set(Thread.State.BLOCKED, Thread.State.WAITING).contains(threadState)) + } + + // Interrupt the blocked thread: + blockedLibraryDownloadThread.interrupt() + + // The thread should exit: + eventually(timeout(10.seconds), interval(100.millis)) { + assert(blockedLibraryDownloadThread.getState == Thread.State.TERMINATED) + } + + // Allow the first thread to finish and exit: + endLatch.countDown() + eventually(timeout(10.seconds), interval(100.millis)) { + assert(slowLibraryDownloadThread.getState == Thread.State.TERMINATED) + } + } + } + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { val mockEnv = mock[SparkEnv] val mockRpcEnv = mock[RpcEnv] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org