[ 
https://issues.apache.org/jira/browse/SPARK-55939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bo Xiong updated SPARK-55939:
-----------------------------
          Component/s: SQL
                           (was: Spark Core)
        Fix Version/s: 4.2.0
                           (was: 4.0.0)
                           (was: 3.4.2)
                           (was: 3.5.1)
                           (was: 3.3.4)
    Affects Version/s: 4.2.0
                           (was: 3.3.1)
                           (was: 3.5.0)
                           (was: 4.0.0)
          Description:     (was: h2. Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very 
last step of writing a data frame to S3 by calling {{{}df.write{}}}. Looking at 
Spark UI, we saw that an executor process hung over 1 hour. After we manually 
killed the executor process, the app succeeded. Note that the same EMR cluster 
with two worker nodes was able to run the same app without any issue before and 
after the incident.
h2. Observations

Below is what's observed from relevant container logs and thread dump.
 * A regular task that's sent to the executor, which also reported back to the 
driver upon the task completion.

{quote}$zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 
923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 
bytes) taskResourceAssignments Map()
23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 
923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

$zgrep 'task 923' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923

$zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 
4495 bytes result sent to driver}}
{quote} * Another task that's sent to the executor but didn't get launched 
since the single-threaded dispatcher was stuck (presumably in an "infinite 
loop" as explained later).

{quote}$zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 
924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 
bytes) taskResourceAssignments Map()

$zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924

$zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
>> note that the above command has no matching result, indicating that task 
>> 153.0 in stage 23.0 (TID 924) was never launched}}
{quote}* Thread dump shows that the dispatcher-Executor thread has the 
following stack trace.

{quote}"dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 
nid=0x1aff runnable [0x0000ffff73bba000]
java.lang.Thread.State: RUNNABLE
at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.put(HashMap.scala:126)
at scala.collection.mutable.HashMap.update(HashMap.scala:131)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown 
Source)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)}}
{quote}
h2. Relevant code paths

Within an executor process, there's a [dispatcher 
thread|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170]
 dedicated to CoarseGrainedExecutorBackend (a single RPC endpoint) that 
launches tasks scheduled by the driver. Each task is run on a TaskRunner thread 
backed by a thread pool created for the executor. The TaskRunner thread and the 
dispatcher thread are different. However, they read and write a common object 
(i.e., taskResources) that's a mutable hashmap without thread-safety, in 
[Executor|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561]
 and 
[CoarseGrainedExecutorBackend|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189],
 respectively.
h2. What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread 
runs into an "infinite loop" due to a race condition when two threads access 
the same hashmap object.  For illustration purpose, let's consider the 
following scenario where two threads (Thread 1 and Thread 2) access a hash 
table without thread-safety
 * Thread 1 sees A.next = B, but then yields execution to Thread 2

!hashtable1.png|width=357,height=182!
 
 * Thread 2 triggers a resize operation resulting in B.next = A (Note that 
hashmap doesn't care about ordering), and then yields execution to Thread 1.

 

!hashtable2.png|width=383,height=329!

 
 * After taking over CPU, Thread 1 would run into an "infinite loop" when 
traversing the list in the last bucket, given A.next = B and B.next = A in its 
view.

h2. Proposed fix
 * Replace {{scala.collection.mutable.HashMap}} in CoarseGrainedExecutorBackend 
with {{{}java.util.concurrent.{}}}{{{}ConcurrentHashMap{}}} for thread safety.
 * As a mitigation before the fix is released, consider to use fewer threads 
per executor process (i.e., fewer spark.executor.cores)to reduce the likelihood 
of such a race condition.)
           Issue Type: New Feature  (was: Bug)
             Priority: Major  (was: Critical)
              Summary: Add built-in DataSketches ItemsSketch (Frequent Items) 
functions to Spark SQL  (was: CLONE - Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend where an executor process randomly gets stuck)

> Add built-in DataSketches ItemsSketch (Frequent Items) functions to Spark SQL
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-55939
>                 URL: https://issues.apache.org/jira/browse/SPARK-55939
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Bo Xiong
>            Assignee: Bo Xiong
>            Priority: Major
>              Labels: hang, infinite-loop, pull-request-available, 
> race-condition, stuck, threadsafe
>             Fix For: 4.2.0
>
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to