[ 
https://issues.apache.org/jira/browse/SPARK-45227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bo Xiong updated SPARK-45227:
-----------------------------
    Attachment:     (was: Screenshot 2023-09-19 at 7.55.31 PM.png)

> Fix an issue where an executor process randomly gets stuck by making 
> CoarseGrainedExecutorBackend.taskResources thread-safe
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-45227
>                 URL: https://issues.apache.org/jira/browse/SPARK-45227
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.1, 3.5.0, 4.0.0
>            Reporter: Bo Xiong
>            Priority: Critical
>              Labels: hang, infinite-loop, race-condition, stuck, threadsafe
>             Fix For: 4.0.0, 3.5.1
>
>         Attachments: hashtable1.png, hashtable2.png
>
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> h3. Symptom
> Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very 
> last step of writing a data frame to S3 by calling {{{}df.write{}}}. Looking 
> at Spark UI, we saw that an executor process hung over 1 hour. After we 
> manually killed the executor process, the app succeeded.
> Note that the same EMR cluster with two worker nodes was able to run the same 
> app without any issue before and after the incident.
> h3. Observations
> Below is what's observed from relevant container logs and thread dump.
>  * A regular task that's sent to the executor, which also reported back to 
> the driver upon the task completion.
>  
> {quote}{{$zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
> 23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 
> 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 
> 4432 bytes) taskResourceAssignments Map()
> 23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 
> 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)
> $zgrep 'task 923' container_1694029806204_12865_01_000004/stderr.gz
> 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923
> $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
> 23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
> 23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 
> 4495 bytes result sent to driver}}
> {quote}
> * Another task that's sent to the executor but didn't get launched since the 
> single-threaded dispatcher was stuck (presumably in an "infinite loop" as 
> explained later).
>  
>  
> {quote}{{$zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
> 23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 
> 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 
> 4432 bytes) taskResourceAssignments Map()
> $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
> 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924
> $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
> >> note that the above command has no matching result, indicating that task 
> >> 153.0 in stage 23.0 (TID 924) was never launched}}
> {quote} * Thread dump shows that the dispatcher-Executor thread has the 
> following stack trace.
>  
> {quote}{{"dispatcher-Executor" #40 daemon prio=5 os_prio=0 
> tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
> java.lang.Thread.State: RUNNABLE
> at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
> at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
> at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
> at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
> at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
> at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
> at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
> at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
> at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
> at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
> at scala.collection.mutable.HashMap.put(HashMap.scala:126)
> at scala.collection.mutable.HashMap.update(HashMap.scala:131)
> at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
> at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
> at 
> org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown 
> Source)
> at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
> at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
> at 
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
> at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)}}
> {quote}
> * Relevant code paths
>  
> {quote}Within an executor process, there's a [dispatcher 
> thread|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170]
>  dedicated to CoarseGrainedExecutorBackend (a single RPC endpoint) that 
> launches tasks scheduled by the driver. Each task is run on a TaskRunner 
> thread backed by a thread pool created for the executor. The TaskRunner 
> thread and the dispatcher thread are different. However, they read and write 
> a common object (i.e., taskResources) that's a mutable hashmap without 
> thread-safety, in 
> [Executor|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561]
>  and 
> [CoarseGrainedExecutorBackend|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189],
>  respectively.
> {quote}
> h3. What's going on?
> Based on the above observations, our hypothesis is that the dispatcher thread 
> runs into an "infinite loop" due to a race condition when two threads access 
> the same hashmap object.
> For illustration purpose, let's consider the following scenario where two 
> threads (Thread 1 and Thread 2) access a hash table without thread-safety
>  * Thread 1 sees A.next = B, but then yields execution to Thread 2
> !image-2023-09-19-19-56-29-514.png!
>  
> * Thread 2 triggers a resize operation resulting in B.next = A (Note that 
> hashmap doesn't care about ordering), and then yields execution to Thread 1.
> !image-2023-09-19-19-57-06-725.png!
>  
>  
> * After taking over CPU, Thread 1 would run into an "infinite loop" when 
> traversing the list in the last bucket, given A.next = B and B.next = A in 
> its view.
> h3. Proposed fix
>  * Replace {{scala.collection.mutable.HashMap}} in 
> CoarseGrainedExecutorBackend with 
> {{{}java.util.concurrent.{}}}{{{}ConcurrentHashMap{}}} for thread safety.
>  * Before the fix is released, consider to use fewer threads per executor 
> process (i.e., \{{fewer spark.executor.cores)}}to reduce the likelihood of 
> such a race condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to