This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e6b1603a66 [SPARK-45227][CORE] Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend
8e6b1603a66 is described below

commit 8e6b1603a66706ee27a0b16d850f5ee56d633354
Author: Bo Xiong <xion...@amazon.com>
AuthorDate: Thu Sep 28 22:53:37 2023 -0500

    [SPARK-45227][CORE] Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend
    
    ### What changes were proposed in this pull request?
    Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an 
executor process randomly gets stuck
    
    ### Why are the changes needed?
    For each executor, the single-threaded dispatcher can run into an "infinite 
loop" (as explained in the SPARK-45227). Once an executor process runs into a 
state, it'd stop launching tasks from the driver or reporting task status back.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    ```
    $ build/mvn package -DskipTests -pl core
    $ build/mvn -Dtest=none 
-DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite 
test
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    
******************************************************************************
    **_Please feel free to skip reading unless you're interested in details_**
    
******************************************************************************
    
    ### Symptom
    
    Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the 
very last step of writing a data frame to S3 by calling `df.write`. Looking at 
Spark UI, we saw that an executor process hung over 1 hour. After we manually 
killed the executor process, the app succeeded. Note that the same EMR cluster 
with two worker nodes was able to run the same app without any issue before and 
after the incident.
    
    Below is what's observed from relevant container logs and thread dump.
    
    - A regular task that's sent to the executor, which also reported back to 
the driver upon the task completion.
    
    ```
        $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
        23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 
23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, 
NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
        23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 
23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)
    
        $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
        23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned 
task 923
    
        $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
        23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 
923)
        23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 
923). 4495 bytes result sent to driver
    ```
    
    - Another task that's sent to the executor but didn't get launched since 
the single-threaded dispatcher was stuck (presumably in an "infinite loop" as 
explained later).
    
    ```
        $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
        23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 
23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, 
NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
    
        $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
        23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned 
task 924
    
        $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
        >> note that the above command has no matching result, indicating that 
task 153.0 in stage 23.0 (TID 924) was never launched
    ```
    
    - Thread dump shows that the dispatcher-Executor thread has the following 
stack trace.
    
    ```
        "dispatcher-Executor" #40 daemon prio=5 os_prio=0 
tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
        java.lang.Thread.State: RUNNABLE
        at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
        at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
        at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
        at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
        at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
        at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
        at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
        at 
scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
        at 
scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
        at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
        at scala.collection.mutable.HashMap.put(HashMap.scala:126)
        at scala.collection.mutable.HashMap.update(HashMap.scala:131)
        at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
        at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
        at 
org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown 
Source)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
        at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
        at 
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
    ```
    
    ### Relevant code paths
    
    Within an executor process, there's a [dispatcher 
thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170)
 dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches 
tasks scheduled by the driver. Each task is run on a TaskRunner thread backed 
by a thread pool created for the executor. The TaskRunner thread and the 
dispatcher thread are different. However, they read  [...]
    
    ### What's going on?
    
    Based on the above observations, our hypothesis is that the dispatcher 
thread runs into an "infinite loop" due to a race condition when two threads 
access the same hashmap object.  For illustration purpose, let's consider the 
following scenario where two threads (Thread 1 and Thread 2) access a hash 
table without thread-safety
    
    - Thread 1 sees A.next = B, but then yields execution to Thread 2
    <img 
src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png";
 width="400">
    
    - Thread 2 triggers a resize operation resulting in B.next = A (Note that 
hashmap doesn't care about ordering), and then yields execution to Thread 1.
    <img 
src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png";
 width="400">
    
    - After taking over CPU, Thread 1 would run into an "infinite loop" when 
traversing the list in the last bucket, given A.next = B and B.next = A in its 
view.
    
    Closes #43021 from xiongbo-sjtu/master.
    
    Authored-by: Bo Xiong <xion...@amazon.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../apache/spark/executor/CoarseGrainedExecutorBackend.scala  | 11 +++++++----
 .../spark/executor/CoarseGrainedExecutorBackendSuite.scala    |  6 +++---
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 4a5de8cd5da..299148a912a 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -20,9 +20,9 @@ package org.apache.spark.executor
 import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Locale
+import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicBoolean
 
-import scala.collection.mutable
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
@@ -71,9 +71,12 @@ private[spark] class CoarseGrainedExecutorBackend(
   /**
    * Map each taskId to the information about the resource allocated to it, 
Please refer to
    * [[ResourceInformation]] for specifics.
+   * CHM is used to ensure thread-safety 
(https://issues.apache.org/jira/browse/SPARK-45227)
    * Exposed for testing only.
    */
-  private[executor] val taskResources = new mutable.HashMap[Long, Map[String, 
ResourceInformation]]
+  private[executor] val taskResources = new ConcurrentHashMap[
+    Long, Map[String, ResourceInformation]
+  ]
 
   private var decommissioned = false
 
@@ -186,7 +189,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       } else {
         val taskDesc = TaskDescription.decode(data.value)
         logInfo("Got assigned task " + taskDesc.taskId)
-        taskResources(taskDesc.taskId) = taskDesc.resources
+        taskResources.put(taskDesc.taskId, taskDesc.resources)
         executor.launchTask(this, taskDesc)
       }
 
@@ -266,7 +269,7 @@ private[spark] class CoarseGrainedExecutorBackend(
   }
 
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): 
Unit = {
-    val resources = taskResources.getOrElse(taskId, Map.empty[String, 
ResourceInformation])
+    val resources = taskResources.getOrDefault(taskId, Map.empty[String, 
ResourceInformation])
     val cpus = executor.runningTasks.get(taskId).taskDescription.cpus
     val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
     if (TaskState.isFinished(state)) {
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 0dcc7c7f9b4..909d6054425 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -302,7 +302,7 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
           resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
       assert(backend.taskResources.isEmpty)
 
-      val taskId = 1000000
+      val taskId = 1000000L
       // We don't really verify the data, just pass it around.
       val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
       val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 
1000000", 19,
@@ -339,14 +339,14 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
       backend.self.send(LaunchTask(new 
SerializableBuffer(serializedTaskDescription)))
       eventually(timeout(10.seconds)) {
         assert(backend.taskResources.size == 1)
-        val resources = backend.taskResources(taskId)
+        val resources = backend.taskResources.get(taskId)
         assert(resources(GPU).addresses sameElements Array("0", "1"))
       }
 
       // Update the status of a running task shall not affect `taskResources` 
map.
       backend.statusUpdate(taskId, TaskState.RUNNING, data)
       assert(backend.taskResources.size == 1)
-      val resources = backend.taskResources(taskId)
+      val resources = backend.taskResources.get(taskId)
       assert(resources(GPU).addresses sameElements Array("0", "1"))
 
       // Update the status of a finished task shall remove the entry from 
`taskResources` map.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to