[ 
https://issues.apache.org/jira/browse/SPARK-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148244#comment-14148244
 ] 

Andrew Ash commented on SPARK-2546:
-----------------------------------

Another proposed fix: extend JobConf as a shim and replace the Hadoop one with 
one that's threadsafe

> Configuration object thread safety issue
> ----------------------------------------
>
>                 Key: SPARK-2546
>                 URL: https://issues.apache.org/jira/browse/SPARK-2546
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 0.9.1
>            Reporter: Andrew Ash
>            Assignee: Josh Rosen
>            Priority: Critical
>
> // observed in 0.9.1 but expected to exist in 1.0.1 as well
> This ticket is copy-pasted from a thread on the dev@ list:
> {quote}
> We discovered a very interesting bug in Spark at work last week in Spark 
> 0.9.1 — that the way Spark uses the Hadoop Configuration object is prone to 
> thread safety issues.  I believe it still applies in Spark 1.0.1 as well.  
> Let me explain:
> Observations
>  - Was running a relatively simple job (read from Avro files, do a map, do 
> another map, write back to Avro files)
>  - 412 of 413 tasks completed, but the last task was hung in RUNNING state
>  - The 412 successful tasks completed in median time 3.4s
>  - The last hung task didn't finish even in 20 hours
>  - The executor with the hung task was responsible for 100% of one core of 
> CPU usage
>  - Jstack of the executor attached (relevant thread pasted below)
> Diagnosis
> After doing some code spelunking, we determined the issue was concurrent use 
> of a Configuration object for each task on an executor.  In Hadoop each task 
> runs in its own JVM, but in Spark multiple tasks can run in the same JVM, so 
> the single-threaded access assumptions of the Configuration object no longer 
> hold in Spark.
> The specific issue is that the AvroRecordReader actually _modifies_ the 
> JobConf it's given when it's instantiated!  It adds a key for the RPC 
> protocol engine in the process of connecting to the Hadoop FileSystem.  When 
> many tasks start at the same time (like at the start of a job), many tasks 
> are adding this configuration item to the one Configuration object at once.  
> Internally Configuration uses a java.lang.HashMap, which isn't threadsafe… 
> The below post is an excellent explanation of what happens in the situation 
> where multiple threads insert into a HashMap at the same time.
> http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
> The gist is that you have a thread following a cycle of linked list nodes 
> indefinitely.  This exactly matches our observations of the 100% CPU core and 
> also the final location in the stack trace.
> So it seems the way Spark shares a Configuration object between task threads 
> in an executor is incorrect.  We need some way to prevent concurrent access 
> to a single Configuration object.
> Proposed fix
> We can clone the JobConf object in HadoopRDD.getJobConf() so each task gets 
> its own JobConf object (and thus Configuration object).  The optimization of 
> broadcasting the Configuration object across the cluster can remain, but on 
> the other side I think it needs to be cloned for each task to allow for 
> concurrent access.  I'm not sure the performance implications, but the 
> comments suggest that the Configuration object is ~10KB so I would expect a 
> clone on the object to be relatively speedy.
> Has this been observed before?  Does my suggested fix make sense?  I'd be 
> happy to file a Jira ticket and continue discussion there for the right way 
> to fix.
> Thanks!
> Andrew
> P.S.  For others seeing this issue, our temporary workaround is to enable 
> spark.speculation, which retries failed (or hung) tasks on other machines.
> {noformat}
> "Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000 
> nid=0x54b1 runnable [0x00007f92d74f1000]
>    java.lang.Thread.State: RUNNABLE
>     at java.util.HashMap.transfer(HashMap.java:601)
>     at java.util.HashMap.resize(HashMap.java:581)
>     at java.util.HashMap.addEntry(HashMap.java:879)
>     at java.util.HashMap.put(HashMap.java:505)
>     at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
>     at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
>     at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
>     at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
>     at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
>     at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
>     at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
>     at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
>     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
>     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
>     at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
>     at 
> org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
>     at 
> org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
>     at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
>     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
>     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>     at org.apache.spark.scheduler.Task.run(Task.scala:53)
>     at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
>     at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>     at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:415)
>     at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>     at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> {noformat}
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to