Re: Hadoop's Configuration object isn't threadsafe

2014-07-17 Thread Patrick Wendell
Hey Andrew,

I think you are correct and a follow up to SPARK-2521 will end up
fixing this. The desing of SPARK-2521 automatically broadcasts RDD
data in tasks and the approach creates a new copy of the RDD and
associated data for each task. A natural follow-up to that patch is to
stop handling the jobConf separately (since we will now broadcast all
referents of the RDD itself) and just have it broadcasted with the
RDD. I'm not sure if Reynold plans to include this in SPARK-2521 or
afterwards, but it's likely we'd do that soon.

- Patrick

On Wed, Jul 16, 2014 at 10:24 PM, Andrew Ash and...@andrewash.com wrote:
 Hi Patrick, thanks for taking a look.  I filed as
 https://issues.apache.org/jira/browse/SPARK-2546

 Would you recommend I pursue the cloned Configuration object approach now
 and send in a PR?

 Reynold's recent announcement of the broadcast RDD object patch may also
 have implications of the right path forward here.  I'm not sure I fully
 understand the implications though:
 https://github.com/apache/spark/pull/1452

 Once this is committed, we can also remove the JobConf broadcast in
 HadoopRDD.

 Thanks!
 Andrew


 On Tue, Jul 15, 2014 at 5:20 PM, Patrick Wendell pwend...@gmail.com wrote:

 Hey Andrew,

 Cloning the conf this might be a good/simple fix for this particular
 problem. It's definitely worth looking into.

 There are a few things we can probably do in Spark to deal with
 non-thread-safety inside of the Hadoop FileSystem and Configuration
 classes. One thing we can do in general is to add barriers around the
 locations where we knowingly access Hadoop FileSystem and
 Configuration state from multiple threads (e.g. during our own calls
 to getRecordReader in this case). But this will only deal with writer
 writer conflicts where we had multiple calls mutating the same object
 at the same time. It won't deal with reader writer conflicts where
 some of our initialization code touches state that is needed during
 normal execution of other tasks.

 - Patrick

 On Tue, Jul 15, 2014 at 12:56 PM, Andrew Ash and...@andrewash.com wrote:
  Hi Shengzhe,
 
  Even if we did make Configuration threadsafe, it'd take quite some time
 for
  that to trickle down to a Hadoop release that we could actually rely on
  Spark users having installed.  I agree we should consider whether making
  Configuration threadsafe is something that Hadoop should do, but for the
  short term I think Spark needs to be able to handle the common scenario
 of
  Configuration being single-threaded.
 
  Thanks!
  Andrew
 
 
  On Tue, Jul 15, 2014 at 2:43 PM, yao yaosheng...@gmail.com wrote:
 
  Good catch Andrew. In addition to your proposed solution, is that
 possible
  to fix Configuration class and make it thread-safe ? I think the fix
 should
  be trivial, just use a ConcurrentHashMap, but I am not sure if we can
 push
  this change upstream (will hadoop guys accept this change ? for them, it
  seems they never expect Configuration object being accessed by multiple
  threads).
 
  -Shengzhe
 
 
  On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash and...@andrewash.com
 wrote:
 
   Hi Spark devs,
  
   We discovered a very interesting bug in Spark at work last week in
 Spark
   0.9.1 -- that the way Spark uses the Hadoop Configuration object is
 prone
  to
   thread safety issues.  I believe it still applies in Spark 1.0.1 as
 well.
Let me explain:
  
  
   *Observations*
  
  - Was running a relatively simple job (read from Avro files, do a
 map,
  do another map, write back to Avro files)
  - 412 of 413 tasks completed, but the last task was hung in RUNNING
  state
  - The 412 successful tasks completed in median time 3.4s
  - The last hung task didn't finish even in 20 hours
  - The executor with the hung task was responsible for 100% of one
 core
  of CPU usage
  - Jstack of the executor attached (relevant thread pasted below)
  
  
   *Diagnosis*
  
   After doing some code spelunking, we determined the issue was
 concurrent
   use of a Configuration object for each task on an executor.  In Hadoop
  each
   task runs in its own JVM, but in Spark multiple tasks can run in the
 same
   JVM, so the single-threaded access assumptions of the Configuration
  object
   no longer hold in Spark.
  
   The specific issue is that the AvroRecordReader actually _modifies_
 the
   JobConf it's given when it's instantiated!  It adds a key for the RPC
   protocol engine in the process of connecting to the Hadoop FileSystem.
When many tasks start at the same time (like at the start of a job),
  many
   tasks are adding this configuration item to the one Configuration
 object
  at
   once.  Internally Configuration uses a java.lang.HashMap, which isn't
   threadsafe... The below post is an excellent explanation of what
 happens in
   the situation where multiple threads insert into a HashMap at the same
  time.
  
   http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
  
   The gist 

Re: Hadoop's Configuration object isn't threadsafe

2014-07-17 Thread Andrew Ash
Sounds good -- I added comments to the ticket.

Since SPARK-2521 is scheduled for a 1.1.0 release and we can work around
with spark.speculation, I don't personally see a need for a 1.0.2 backport.

Thanks looking through this issue!


On Thu, Jul 17, 2014 at 2:14 AM, Patrick Wendell pwend...@gmail.com wrote:

 Hey Andrew,

 I think you are correct and a follow up to SPARK-2521 will end up
 fixing this. The desing of SPARK-2521 automatically broadcasts RDD
 data in tasks and the approach creates a new copy of the RDD and
 associated data for each task. A natural follow-up to that patch is to
 stop handling the jobConf separately (since we will now broadcast all
 referents of the RDD itself) and just have it broadcasted with the
 RDD. I'm not sure if Reynold plans to include this in SPARK-2521 or
 afterwards, but it's likely we'd do that soon.

 - Patrick

 On Wed, Jul 16, 2014 at 10:24 PM, Andrew Ash and...@andrewash.com wrote:
  Hi Patrick, thanks for taking a look.  I filed as
  https://issues.apache.org/jira/browse/SPARK-2546
 
  Would you recommend I pursue the cloned Configuration object approach now
  and send in a PR?
 
  Reynold's recent announcement of the broadcast RDD object patch may also
  have implications of the right path forward here.  I'm not sure I fully
  understand the implications though:
  https://github.com/apache/spark/pull/1452
 
  Once this is committed, we can also remove the JobConf broadcast in
  HadoopRDD.
 
  Thanks!
  Andrew
 
 
  On Tue, Jul 15, 2014 at 5:20 PM, Patrick Wendell pwend...@gmail.com
 wrote:
 
  Hey Andrew,
 
  Cloning the conf this might be a good/simple fix for this particular
  problem. It's definitely worth looking into.
 
  There are a few things we can probably do in Spark to deal with
  non-thread-safety inside of the Hadoop FileSystem and Configuration
  classes. One thing we can do in general is to add barriers around the
  locations where we knowingly access Hadoop FileSystem and
  Configuration state from multiple threads (e.g. during our own calls
  to getRecordReader in this case). But this will only deal with writer
  writer conflicts where we had multiple calls mutating the same object
  at the same time. It won't deal with reader writer conflicts where
  some of our initialization code touches state that is needed during
  normal execution of other tasks.
 
  - Patrick
 
  On Tue, Jul 15, 2014 at 12:56 PM, Andrew Ash and...@andrewash.com
 wrote:
   Hi Shengzhe,
  
   Even if we did make Configuration threadsafe, it'd take quite some
 time
  for
   that to trickle down to a Hadoop release that we could actually rely
 on
   Spark users having installed.  I agree we should consider whether
 making
   Configuration threadsafe is something that Hadoop should do, but for
 the
   short term I think Spark needs to be able to handle the common
 scenario
  of
   Configuration being single-threaded.
  
   Thanks!
   Andrew
  
  
   On Tue, Jul 15, 2014 at 2:43 PM, yao yaosheng...@gmail.com wrote:
  
   Good catch Andrew. In addition to your proposed solution, is that
  possible
   to fix Configuration class and make it thread-safe ? I think the fix
  should
   be trivial, just use a ConcurrentHashMap, but I am not sure if we can
  push
   this change upstream (will hadoop guys accept this change ? for
 them, it
   seems they never expect Configuration object being accessed by
 multiple
   threads).
  
   -Shengzhe
  
  
   On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash and...@andrewash.com
  wrote:
  
Hi Spark devs,
   
We discovered a very interesting bug in Spark at work last week in
  Spark
0.9.1 -- that the way Spark uses the Hadoop Configuration object is
  prone
   to
thread safety issues.  I believe it still applies in Spark 1.0.1 as
  well.
 Let me explain:
   
   
*Observations*
   
   - Was running a relatively simple job (read from Avro files, do
 a
  map,
   do another map, write back to Avro files)
   - 412 of 413 tasks completed, but the last task was hung in
 RUNNING
   state
   - The 412 successful tasks completed in median time 3.4s
   - The last hung task didn't finish even in 20 hours
   - The executor with the hung task was responsible for 100% of
 one
  core
   of CPU usage
   - Jstack of the executor attached (relevant thread pasted below)
   
   
*Diagnosis*
   
After doing some code spelunking, we determined the issue was
  concurrent
use of a Configuration object for each task on an executor.  In
 Hadoop
   each
task runs in its own JVM, but in Spark multiple tasks can run in
 the
  same
JVM, so the single-threaded access assumptions of the Configuration
   object
no longer hold in Spark.
   
The specific issue is that the AvroRecordReader actually _modifies_
  the
JobConf it's given when it's instantiated!  It adds a key for the
 RPC
protocol engine in the process of connecting to the Hadoop
 FileSystem.
 When many tasks start at the 

Re: Hadoop's Configuration object isn't threadsafe

2014-07-16 Thread Andrew Ash
Hi Patrick, thanks for taking a look.  I filed as
https://issues.apache.org/jira/browse/SPARK-2546

Would you recommend I pursue the cloned Configuration object approach now
and send in a PR?

Reynold's recent announcement of the broadcast RDD object patch may also
have implications of the right path forward here.  I'm not sure I fully
understand the implications though:
https://github.com/apache/spark/pull/1452

Once this is committed, we can also remove the JobConf broadcast in
HadoopRDD.

Thanks!
Andrew


On Tue, Jul 15, 2014 at 5:20 PM, Patrick Wendell pwend...@gmail.com wrote:

 Hey Andrew,

 Cloning the conf this might be a good/simple fix for this particular
 problem. It's definitely worth looking into.

 There are a few things we can probably do in Spark to deal with
 non-thread-safety inside of the Hadoop FileSystem and Configuration
 classes. One thing we can do in general is to add barriers around the
 locations where we knowingly access Hadoop FileSystem and
 Configuration state from multiple threads (e.g. during our own calls
 to getRecordReader in this case). But this will only deal with writer
 writer conflicts where we had multiple calls mutating the same object
 at the same time. It won't deal with reader writer conflicts where
 some of our initialization code touches state that is needed during
 normal execution of other tasks.

 - Patrick

 On Tue, Jul 15, 2014 at 12:56 PM, Andrew Ash and...@andrewash.com wrote:
  Hi Shengzhe,
 
  Even if we did make Configuration threadsafe, it'd take quite some time
 for
  that to trickle down to a Hadoop release that we could actually rely on
  Spark users having installed.  I agree we should consider whether making
  Configuration threadsafe is something that Hadoop should do, but for the
  short term I think Spark needs to be able to handle the common scenario
 of
  Configuration being single-threaded.
 
  Thanks!
  Andrew
 
 
  On Tue, Jul 15, 2014 at 2:43 PM, yao yaosheng...@gmail.com wrote:
 
  Good catch Andrew. In addition to your proposed solution, is that
 possible
  to fix Configuration class and make it thread-safe ? I think the fix
 should
  be trivial, just use a ConcurrentHashMap, but I am not sure if we can
 push
  this change upstream (will hadoop guys accept this change ? for them, it
  seems they never expect Configuration object being accessed by multiple
  threads).
 
  -Shengzhe
 
 
  On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash and...@andrewash.com
 wrote:
 
   Hi Spark devs,
  
   We discovered a very interesting bug in Spark at work last week in
 Spark
   0.9.1 -- that the way Spark uses the Hadoop Configuration object is
 prone
  to
   thread safety issues.  I believe it still applies in Spark 1.0.1 as
 well.
Let me explain:
  
  
   *Observations*
  
  - Was running a relatively simple job (read from Avro files, do a
 map,
  do another map, write back to Avro files)
  - 412 of 413 tasks completed, but the last task was hung in RUNNING
  state
  - The 412 successful tasks completed in median time 3.4s
  - The last hung task didn't finish even in 20 hours
  - The executor with the hung task was responsible for 100% of one
 core
  of CPU usage
  - Jstack of the executor attached (relevant thread pasted below)
  
  
   *Diagnosis*
  
   After doing some code spelunking, we determined the issue was
 concurrent
   use of a Configuration object for each task on an executor.  In Hadoop
  each
   task runs in its own JVM, but in Spark multiple tasks can run in the
 same
   JVM, so the single-threaded access assumptions of the Configuration
  object
   no longer hold in Spark.
  
   The specific issue is that the AvroRecordReader actually _modifies_
 the
   JobConf it's given when it's instantiated!  It adds a key for the RPC
   protocol engine in the process of connecting to the Hadoop FileSystem.
When many tasks start at the same time (like at the start of a job),
  many
   tasks are adding this configuration item to the one Configuration
 object
  at
   once.  Internally Configuration uses a java.lang.HashMap, which isn't
   threadsafe... The below post is an excellent explanation of what
 happens in
   the situation where multiple threads insert into a HashMap at the same
  time.
  
   http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
  
   The gist is that you have a thread following a cycle of linked list
 nodes
   indefinitely.  This exactly matches our observations of the 100% CPU
 core
   and also the final location in the stack trace.
  
   So it seems the way Spark shares a Configuration object between task
   threads in an executor is incorrect.  We need some way to prevent
   concurrent access to a single Configuration object.
  
  
   *Proposed fix*
  
   We can clone the JobConf object in HadoopRDD.getJobConf() so each task
   gets its own JobConf object (and thus Configuration object).  The
   optimization of broadcasting the Configuration object across the

Re: Hadoop's Configuration object isn't threadsafe

2014-07-15 Thread yao
Good catch Andrew. In addition to your proposed solution, is that possible
to fix Configuration class and make it thread-safe ? I think the fix should
be trivial, just use a ConcurrentHashMap, but I am not sure if we can push
this change upstream (will hadoop guys accept this change ? for them, it
seems they never expect Configuration object being accessed by multiple
threads).

-Shengzhe


On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash and...@andrewash.com wrote:

 Hi Spark devs,

 We discovered a very interesting bug in Spark at work last week in Spark
 0.9.1 — that the way Spark uses the Hadoop Configuration object is prone to
 thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
  Let me explain:


 *Observations*

- Was running a relatively simple job (read from Avro files, do a map,
do another map, write back to Avro files)
- 412 of 413 tasks completed, but the last task was hung in RUNNING
state
- The 412 successful tasks completed in median time 3.4s
- The last hung task didn't finish even in 20 hours
- The executor with the hung task was responsible for 100% of one core
of CPU usage
- Jstack of the executor attached (relevant thread pasted below)


 *Diagnosis*

 After doing some code spelunking, we determined the issue was concurrent
 use of a Configuration object for each task on an executor.  In Hadoop each
 task runs in its own JVM, but in Spark multiple tasks can run in the same
 JVM, so the single-threaded access assumptions of the Configuration object
 no longer hold in Spark.

 The specific issue is that the AvroRecordReader actually _modifies_ the
 JobConf it's given when it's instantiated!  It adds a key for the RPC
 protocol engine in the process of connecting to the Hadoop FileSystem.
  When many tasks start at the same time (like at the start of a job), many
 tasks are adding this configuration item to the one Configuration object at
 once.  Internally Configuration uses a java.lang.HashMap, which isn't
 threadsafe… The below post is an excellent explanation of what happens in
 the situation where multiple threads insert into a HashMap at the same time.

 http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html

 The gist is that you have a thread following a cycle of linked list nodes
 indefinitely.  This exactly matches our observations of the 100% CPU core
 and also the final location in the stack trace.

 So it seems the way Spark shares a Configuration object between task
 threads in an executor is incorrect.  We need some way to prevent
 concurrent access to a single Configuration object.


 *Proposed fix*

 We can clone the JobConf object in HadoopRDD.getJobConf() so each task
 gets its own JobConf object (and thus Configuration object).  The
 optimization of broadcasting the Configuration object across the cluster
 can remain, but on the other side I think it needs to be cloned for each
 task to allow for concurrent access.  I'm not sure the performance
 implications, but the comments suggest that the Configuration object is
 ~10KB so I would expect a clone on the object to be relatively speedy.

 Has this been observed before?  Does my suggested fix make sense?  I'd be
 happy to file a Jira ticket and continue discussion there for the right way
 to fix.


 Thanks!
 Andrew


 P.S.  For others seeing this issue, our temporary workaround is to enable
 spark.speculation, which retries failed (or hung) tasks on other machines.



 Executor task launch worker-6 daemon prio=10 tid=0x7f91f01fe000
 nid=0x54b1 runnable [0x7f92d74f1000]
java.lang.Thread.State: RUNNABLE
 at java.util.HashMap.transfer(HashMap.java:601)
 at java.util.HashMap.resize(HashMap.java:581)
 at java.util.HashMap.addEntry(HashMap.java:879)
 at java.util.HashMap.put(HashMap.java:505)
 at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
 at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
 at
 org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
 at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
 at
 org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
 at
 org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
 at
 org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:436)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:403)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
 at
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
 at
 org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
 at 

Re: Hadoop's Configuration object isn't threadsafe

2014-07-15 Thread Andrew Ash
Hi Shengzhe,

Even if we did make Configuration threadsafe, it'd take quite some time for
that to trickle down to a Hadoop release that we could actually rely on
Spark users having installed.  I agree we should consider whether making
Configuration threadsafe is something that Hadoop should do, but for the
short term I think Spark needs to be able to handle the common scenario of
Configuration being single-threaded.

Thanks!
Andrew


On Tue, Jul 15, 2014 at 2:43 PM, yao yaosheng...@gmail.com wrote:

 Good catch Andrew. In addition to your proposed solution, is that possible
 to fix Configuration class and make it thread-safe ? I think the fix should
 be trivial, just use a ConcurrentHashMap, but I am not sure if we can push
 this change upstream (will hadoop guys accept this change ? for them, it
 seems they never expect Configuration object being accessed by multiple
 threads).

 -Shengzhe


 On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash and...@andrewash.com wrote:

  Hi Spark devs,
 
  We discovered a very interesting bug in Spark at work last week in Spark
  0.9.1 — that the way Spark uses the Hadoop Configuration object is prone
 to
  thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
   Let me explain:
 
 
  *Observations*
 
 - Was running a relatively simple job (read from Avro files, do a map,
 do another map, write back to Avro files)
 - 412 of 413 tasks completed, but the last task was hung in RUNNING
 state
 - The 412 successful tasks completed in median time 3.4s
 - The last hung task didn't finish even in 20 hours
 - The executor with the hung task was responsible for 100% of one core
 of CPU usage
 - Jstack of the executor attached (relevant thread pasted below)
 
 
  *Diagnosis*
 
  After doing some code spelunking, we determined the issue was concurrent
  use of a Configuration object for each task on an executor.  In Hadoop
 each
  task runs in its own JVM, but in Spark multiple tasks can run in the same
  JVM, so the single-threaded access assumptions of the Configuration
 object
  no longer hold in Spark.
 
  The specific issue is that the AvroRecordReader actually _modifies_ the
  JobConf it's given when it's instantiated!  It adds a key for the RPC
  protocol engine in the process of connecting to the Hadoop FileSystem.
   When many tasks start at the same time (like at the start of a job),
 many
  tasks are adding this configuration item to the one Configuration object
 at
  once.  Internally Configuration uses a java.lang.HashMap, which isn't
  threadsafe… The below post is an excellent explanation of what happens in
  the situation where multiple threads insert into a HashMap at the same
 time.
 
  http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
 
  The gist is that you have a thread following a cycle of linked list nodes
  indefinitely.  This exactly matches our observations of the 100% CPU core
  and also the final location in the stack trace.
 
  So it seems the way Spark shares a Configuration object between task
  threads in an executor is incorrect.  We need some way to prevent
  concurrent access to a single Configuration object.
 
 
  *Proposed fix*
 
  We can clone the JobConf object in HadoopRDD.getJobConf() so each task
  gets its own JobConf object (and thus Configuration object).  The
  optimization of broadcasting the Configuration object across the cluster
  can remain, but on the other side I think it needs to be cloned for each
  task to allow for concurrent access.  I'm not sure the performance
  implications, but the comments suggest that the Configuration object is
  ~10KB so I would expect a clone on the object to be relatively speedy.
 
  Has this been observed before?  Does my suggested fix make sense?  I'd be
  happy to file a Jira ticket and continue discussion there for the right
 way
  to fix.
 
 
  Thanks!
  Andrew
 
 
  P.S.  For others seeing this issue, our temporary workaround is to enable
  spark.speculation, which retries failed (or hung) tasks on other
 machines.
 
 
 
  Executor task launch worker-6 daemon prio=10 tid=0x7f91f01fe000
  nid=0x54b1 runnable [0x7f92d74f1000]
 java.lang.Thread.State: RUNNABLE
  at java.util.HashMap.transfer(HashMap.java:601)
  at java.util.HashMap.resize(HashMap.java:581)
  at java.util.HashMap.addEntry(HashMap.java:879)
  at java.util.HashMap.put(HashMap.java:505)
  at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
  at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
  at
  org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
  at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
  at
 
 org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
  at
 
 org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
  at
 
 

Re: Hadoop's Configuration object isn't threadsafe

2014-07-15 Thread Patrick Wendell
Hey Andrew,

Cloning the conf this might be a good/simple fix for this particular
problem. It's definitely worth looking into.

There are a few things we can probably do in Spark to deal with
non-thread-safety inside of the Hadoop FileSystem and Configuration
classes. One thing we can do in general is to add barriers around the
locations where we knowingly access Hadoop FileSystem and
Configuration state from multiple threads (e.g. during our own calls
to getRecordReader in this case). But this will only deal with writer
writer conflicts where we had multiple calls mutating the same object
at the same time. It won't deal with reader writer conflicts where
some of our initialization code touches state that is needed during
normal execution of other tasks.

- Patrick

On Tue, Jul 15, 2014 at 12:56 PM, Andrew Ash and...@andrewash.com wrote:
 Hi Shengzhe,

 Even if we did make Configuration threadsafe, it'd take quite some time for
 that to trickle down to a Hadoop release that we could actually rely on
 Spark users having installed.  I agree we should consider whether making
 Configuration threadsafe is something that Hadoop should do, but for the
 short term I think Spark needs to be able to handle the common scenario of
 Configuration being single-threaded.

 Thanks!
 Andrew


 On Tue, Jul 15, 2014 at 2:43 PM, yao yaosheng...@gmail.com wrote:

 Good catch Andrew. In addition to your proposed solution, is that possible
 to fix Configuration class and make it thread-safe ? I think the fix should
 be trivial, just use a ConcurrentHashMap, but I am not sure if we can push
 this change upstream (will hadoop guys accept this change ? for them, it
 seems they never expect Configuration object being accessed by multiple
 threads).

 -Shengzhe


 On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash and...@andrewash.com wrote:

  Hi Spark devs,
 
  We discovered a very interesting bug in Spark at work last week in Spark
  0.9.1 -- that the way Spark uses the Hadoop Configuration object is prone
 to
  thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
   Let me explain:
 
 
  *Observations*
 
 - Was running a relatively simple job (read from Avro files, do a map,
 do another map, write back to Avro files)
 - 412 of 413 tasks completed, but the last task was hung in RUNNING
 state
 - The 412 successful tasks completed in median time 3.4s
 - The last hung task didn't finish even in 20 hours
 - The executor with the hung task was responsible for 100% of one core
 of CPU usage
 - Jstack of the executor attached (relevant thread pasted below)
 
 
  *Diagnosis*
 
  After doing some code spelunking, we determined the issue was concurrent
  use of a Configuration object for each task on an executor.  In Hadoop
 each
  task runs in its own JVM, but in Spark multiple tasks can run in the same
  JVM, so the single-threaded access assumptions of the Configuration
 object
  no longer hold in Spark.
 
  The specific issue is that the AvroRecordReader actually _modifies_ the
  JobConf it's given when it's instantiated!  It adds a key for the RPC
  protocol engine in the process of connecting to the Hadoop FileSystem.
   When many tasks start at the same time (like at the start of a job),
 many
  tasks are adding this configuration item to the one Configuration object
 at
  once.  Internally Configuration uses a java.lang.HashMap, which isn't
  threadsafe... The below post is an excellent explanation of what happens in
  the situation where multiple threads insert into a HashMap at the same
 time.
 
  http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
 
  The gist is that you have a thread following a cycle of linked list nodes
  indefinitely.  This exactly matches our observations of the 100% CPU core
  and also the final location in the stack trace.
 
  So it seems the way Spark shares a Configuration object between task
  threads in an executor is incorrect.  We need some way to prevent
  concurrent access to a single Configuration object.
 
 
  *Proposed fix*
 
  We can clone the JobConf object in HadoopRDD.getJobConf() so each task
  gets its own JobConf object (and thus Configuration object).  The
  optimization of broadcasting the Configuration object across the cluster
  can remain, but on the other side I think it needs to be cloned for each
  task to allow for concurrent access.  I'm not sure the performance
  implications, but the comments suggest that the Configuration object is
  ~10KB so I would expect a clone on the object to be relatively speedy.
 
  Has this been observed before?  Does my suggested fix make sense?  I'd be
  happy to file a Jira ticket and continue discussion there for the right
 way
  to fix.
 
 
  Thanks!
  Andrew
 
 
  P.S.  For others seeing this issue, our temporary workaround is to enable
  spark.speculation, which retries failed (or hung) tasks on other
 machines.
 
 
 
  Executor task launch worker-6 daemon prio=10