Good catch Andrew. In addition to your proposed solution, is that possible
to fix Configuration class and make it thread-safe ? I think the fix should
be trivial, just use a ConcurrentHashMap, but I am not sure if we can push
this change upstream (will hadoop guys accept this change ? for them, it
seems they never expect Configuration object being accessed by multiple


On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash <> wrote:

> Hi Spark devs,
> We discovered a very interesting bug in Spark at work last week in Spark
> 0.9.1 — that the way Spark uses the Hadoop Configuration object is prone to
> thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
>  Let me explain:
> *Observations*
>    - Was running a relatively simple job (read from Avro files, do a map,
>    do another map, write back to Avro files)
>    - 412 of 413 tasks completed, but the last task was hung in RUNNING
>    state
>    - The 412 successful tasks completed in median time 3.4s
>    - The last hung task didn't finish even in 20 hours
>    - The executor with the hung task was responsible for 100% of one core
>    of CPU usage
>    - Jstack of the executor attached (relevant thread pasted below)
> *Diagnosis*
> After doing some code spelunking, we determined the issue was concurrent
> use of a Configuration object for each task on an executor.  In Hadoop each
> task runs in its own JVM, but in Spark multiple tasks can run in the same
> JVM, so the single-threaded access assumptions of the Configuration object
> no longer hold in Spark.
> The specific issue is that the AvroRecordReader actually _modifies_ the
> JobConf it's given when it's instantiated!  It adds a key for the RPC
> protocol engine in the process of connecting to the Hadoop FileSystem.
>  When many tasks start at the same time (like at the start of a job), many
> tasks are adding this configuration item to the one Configuration object at
> once.  Internally Configuration uses a java.lang.HashMap, which isn't
> threadsafe… The below post is an excellent explanation of what happens in
> the situation where multiple threads insert into a HashMap at the same time.
> The gist is that you have a thread following a cycle of linked list nodes
> indefinitely.  This exactly matches our observations of the 100% CPU core
> and also the final location in the stack trace.
> So it seems the way Spark shares a Configuration object between task
> threads in an executor is incorrect.  We need some way to prevent
> concurrent access to a single Configuration object.
> *Proposed fix*
> We can clone the JobConf object in HadoopRDD.getJobConf() so each task
> gets its own JobConf object (and thus Configuration object).  The
> optimization of broadcasting the Configuration object across the cluster
> can remain, but on the other side I think it needs to be cloned for each
> task to allow for concurrent access.  I'm not sure the performance
> implications, but the comments suggest that the Configuration object is
> ~10KB so I would expect a clone on the object to be relatively speedy.
> Has this been observed before?  Does my suggested fix make sense?  I'd be
> happy to file a Jira ticket and continue discussion there for the right way
> to fix.
> Thanks!
> Andrew
> P.S.  For others seeing this issue, our temporary workaround is to enable
> spark.speculation, which retries failed (or hung) tasks on other machines.
> "Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000
> nid=0x54b1 runnable [0x00007f92d74f1000]
>    java.lang.Thread.State: RUNNABLE
>     at java.util.HashMap.transfer(
>     at java.util.HashMap.resize(
>     at java.util.HashMap.addEntry(
>     at java.util.HashMap.put(
>     at org.apache.hadoop.conf.Configuration.set(
>     at org.apache.hadoop.conf.Configuration.set(
>     at
> org.apache.hadoop.conf.Configuration.setClass(
>     at org.apache.hadoop.ipc.RPC.setProtocolEngine(
>     at
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(
>     at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(
>     at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(
>     at org.apache.hadoop.hdfs.DFSClient.<init>(
>     at org.apache.hadoop.hdfs.DFSClient.<init>(
>     at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(
>     at
> org.apache.hadoop.fs.FileSystem.createFileSystem(
>     at org.apache.hadoop.fs.FileSystem.access$200(
>     at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(
>     at org.apache.hadoop.fs.FileSystem$Cache.get(
>     at org.apache.hadoop.fs.FileSystem.get(
>     at org.apache.hadoop.fs.Path.getFileSystem(
>     at org.apache.avro.mapred.FsInput.<init>(
>     at
> org.apache.avro.mapred.AvroRecordReader.<init>(
>     at
> org.apache.avro.mapred.AvroInputFormat.getRecordReader(
>     at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
>     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
>     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>     at
>     at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$
>     at Method)
>     at
>     at
>     at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>     at
> org.apache.spark.executor.Executor$
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
>     at
> java.util.concurrent.ThreadPoolExecutor$
>     at

Reply via email to