[ https://issues.apache.org/jira/browse/SPARK-45227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mridul Muralidharan reassigned SPARK-45227: ------------------------------------------- Assignee: Bo Xiong > Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an > executor process randomly gets stuck > ---------------------------------------------------------------------------------------------------------------- > > Key: SPARK-45227 > URL: https://issues.apache.org/jira/browse/SPARK-45227 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.3.1, 3.5.0, 4.0.0 > Reporter: Bo Xiong > Assignee: Bo Xiong > Priority: Critical > Labels: hang, infinite-loop, pull-request-available, > race-condition, stuck, threadsafe > Fix For: 3.4.2, 4.0.0, 3.5.1 > > Attachments: hashtable1.png, hashtable2.png > > Original Estimate: 4h > Remaining Estimate: 4h > > h2. Symptom > Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very > last step of writing a data frame to S3 by calling {{{}df.write{}}}. Looking > at Spark UI, we saw that an executor process hung over 1 hour. After we > manually killed the executor process, the app succeeded. Note that the same > EMR cluster with two worker nodes was able to run the same app without any > issue before and after the incident. > h2. Observations > Below is what's observed from relevant container logs and thread dump. > * A regular task that's sent to the executor, which also reported back to > the driver upon the task completion. > {quote}$zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz > 23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID > 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, > 4432 bytes) taskResourceAssignments Map() > 23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID > 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200) > $zgrep 'task 923' container_1694029806204_12865_01_000004/stderr.gz > 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923 > $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz > 23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923) > 23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). > 4495 bytes result sent to driver}} > {quote} * Another task that's sent to the executor but didn't get launched > since the single-threaded dispatcher was stuck (presumably in an "infinite > loop" as explained later). > {quote}$zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz > 23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID > 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, > 4432 bytes) taskResourceAssignments Map() > $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz > 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924 > $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz > >> note that the above command has no matching result, indicating that task > >> 153.0 in stage 23.0 (TID 924) was never launched}} > {quote}* Thread dump shows that the dispatcher-Executor thread has the > following stack trace. > {quote}"dispatcher-Executor" #40 daemon prio=5 os_prio=0 > tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000] > java.lang.Thread.State: RUNNABLE > at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142) > at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131) > at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123) > at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365) > at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365) > at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44) > at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140) > at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169) > at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167) > at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44) > at scala.collection.mutable.HashMap.put(HashMap.scala:126) > at scala.collection.mutable.HashMap.update(HashMap.scala:131) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200) > at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) > at > org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown > Source) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) > at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750)}} > {quote} > h2. Relevant code paths > Within an executor process, there's a [dispatcher > thread|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170] > dedicated to CoarseGrainedExecutorBackend (a single RPC endpoint) that > launches tasks scheduled by the driver. Each task is run on a TaskRunner > thread backed by a thread pool created for the executor. The TaskRunner > thread and the dispatcher thread are different. However, they read and write > a common object (i.e., taskResources) that's a mutable hashmap without > thread-safety, in > [Executor|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561] > and > [CoarseGrainedExecutorBackend|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189], > respectively. > h2. What's going on? > Based on the above observations, our hypothesis is that the dispatcher thread > runs into an "infinite loop" due to a race condition when two threads access > the same hashmap object. For illustration purpose, let's consider the > following scenario where two threads (Thread 1 and Thread 2) access a hash > table without thread-safety > * Thread 1 sees A.next = B, but then yields execution to Thread 2 > !hashtable1.png|width=357,height=182! > > * Thread 2 triggers a resize operation resulting in B.next = A (Note that > hashmap doesn't care about ordering), and then yields execution to Thread 1. > > !hashtable2.png|width=383,height=329! > > * After taking over CPU, Thread 1 would run into an "infinite loop" when > traversing the list in the last bucket, given A.next = B and B.next = A in > its view. > h2. Proposed fix > * Replace {{scala.collection.mutable.HashMap}} in > CoarseGrainedExecutorBackend with > {{{}java.util.concurrent.{}}}{{{}ConcurrentHashMap{}}} for thread safety. > * As a mitigation before the fix is released, consider to use fewer threads > per executor process (i.e., fewer spark.executor.cores)to reduce the > likelihood of such a race condition. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org