Sounds good -- I added comments to the ticket. Since SPARK-2521 is scheduled for a 1.1.0 release and we can work around with spark.speculation, I don't personally see a need for a 1.0.2 backport.
Thanks looking through this issue! On Thu, Jul 17, 2014 at 2:14 AM, Patrick Wendell <pwend...@gmail.com> wrote: > Hey Andrew, > > I think you are correct and a follow up to SPARK-2521 will end up > fixing this. The desing of SPARK-2521 automatically broadcasts RDD > data in tasks and the approach creates a new copy of the RDD and > associated data for each task. A natural follow-up to that patch is to > stop handling the jobConf separately (since we will now broadcast all > referents of the RDD itself) and just have it broadcasted with the > RDD. I'm not sure if Reynold plans to include this in SPARK-2521 or > afterwards, but it's likely we'd do that soon. > > - Patrick > > On Wed, Jul 16, 2014 at 10:24 PM, Andrew Ash <and...@andrewash.com> wrote: > > Hi Patrick, thanks for taking a look. I filed as > > https://issues.apache.org/jira/browse/SPARK-2546 > > > > Would you recommend I pursue the cloned Configuration object approach now > > and send in a PR? > > > > Reynold's recent announcement of the broadcast RDD object patch may also > > have implications of the right path forward here. I'm not sure I fully > > understand the implications though: > > https://github.com/apache/spark/pull/1452 > > > > "Once this is committed, we can also remove the JobConf broadcast in > > HadoopRDD." > > > > Thanks! > > Andrew > > > > > > On Tue, Jul 15, 2014 at 5:20 PM, Patrick Wendell <pwend...@gmail.com> > wrote: > > > >> Hey Andrew, > >> > >> Cloning the conf this might be a good/simple fix for this particular > >> problem. It's definitely worth looking into. > >> > >> There are a few things we can probably do in Spark to deal with > >> non-thread-safety inside of the Hadoop FileSystem and Configuration > >> classes. One thing we can do in general is to add barriers around the > >> locations where we knowingly access Hadoop FileSystem and > >> Configuration state from multiple threads (e.g. during our own calls > >> to getRecordReader in this case). But this will only deal with "writer > >> writer" conflicts where we had multiple calls mutating the same object > >> at the same time. It won't deal with "reader writer" conflicts where > >> some of our initialization code touches state that is needed during > >> normal execution of other tasks. > >> > >> - Patrick > >> > >> On Tue, Jul 15, 2014 at 12:56 PM, Andrew Ash <and...@andrewash.com> > wrote: > >> > Hi Shengzhe, > >> > > >> > Even if we did make Configuration threadsafe, it'd take quite some > time > >> for > >> > that to trickle down to a Hadoop release that we could actually rely > on > >> > Spark users having installed. I agree we should consider whether > making > >> > Configuration threadsafe is something that Hadoop should do, but for > the > >> > short term I think Spark needs to be able to handle the common > scenario > >> of > >> > Configuration being single-threaded. > >> > > >> > Thanks! > >> > Andrew > >> > > >> > > >> > On Tue, Jul 15, 2014 at 2:43 PM, yao <yaosheng...@gmail.com> wrote: > >> > > >> >> Good catch Andrew. In addition to your proposed solution, is that > >> possible > >> >> to fix Configuration class and make it thread-safe ? I think the fix > >> should > >> >> be trivial, just use a ConcurrentHashMap, but I am not sure if we can > >> push > >> >> this change upstream (will hadoop guys accept this change ? for > them, it > >> >> seems they never expect Configuration object being accessed by > multiple > >> >> threads). > >> >> > >> >> -Shengzhe > >> >> > >> >> > >> >> On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash <and...@andrewash.com> > >> wrote: > >> >> > >> >> > Hi Spark devs, > >> >> > > >> >> > We discovered a very interesting bug in Spark at work last week in > >> Spark > >> >> > 0.9.1 -- that the way Spark uses the Hadoop Configuration object is > >> prone > >> >> to > >> >> > thread safety issues. I believe it still applies in Spark 1.0.1 as > >> well. > >> >> > Let me explain: > >> >> > > >> >> > > >> >> > *Observations* > >> >> > > >> >> > - Was running a relatively simple job (read from Avro files, do > a > >> map, > >> >> > do another map, write back to Avro files) > >> >> > - 412 of 413 tasks completed, but the last task was hung in > RUNNING > >> >> > state > >> >> > - The 412 successful tasks completed in median time 3.4s > >> >> > - The last hung task didn't finish even in 20 hours > >> >> > - The executor with the hung task was responsible for 100% of > one > >> core > >> >> > of CPU usage > >> >> > - Jstack of the executor attached (relevant thread pasted below) > >> >> > > >> >> > > >> >> > *Diagnosis* > >> >> > > >> >> > After doing some code spelunking, we determined the issue was > >> concurrent > >> >> > use of a Configuration object for each task on an executor. In > Hadoop > >> >> each > >> >> > task runs in its own JVM, but in Spark multiple tasks can run in > the > >> same > >> >> > JVM, so the single-threaded access assumptions of the Configuration > >> >> object > >> >> > no longer hold in Spark. > >> >> > > >> >> > The specific issue is that the AvroRecordReader actually _modifies_ > >> the > >> >> > JobConf it's given when it's instantiated! It adds a key for the > RPC > >> >> > protocol engine in the process of connecting to the Hadoop > FileSystem. > >> >> > When many tasks start at the same time (like at the start of a > job), > >> >> many > >> >> > tasks are adding this configuration item to the one Configuration > >> object > >> >> at > >> >> > once. Internally Configuration uses a java.lang.HashMap, which > isn't > >> >> > threadsafe... The below post is an excellent explanation of what > >> happens in > >> >> > the situation where multiple threads insert into a HashMap at the > same > >> >> time. > >> >> > > >> >> > > http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html > >> >> > > >> >> > The gist is that you have a thread following a cycle of linked list > >> nodes > >> >> > indefinitely. This exactly matches our observations of the 100% > CPU > >> core > >> >> > and also the final location in the stack trace. > >> >> > > >> >> > So it seems the way Spark shares a Configuration object between > task > >> >> > threads in an executor is incorrect. We need some way to prevent > >> >> > concurrent access to a single Configuration object. > >> >> > > >> >> > > >> >> > *Proposed fix* > >> >> > > >> >> > We can clone the JobConf object in HadoopRDD.getJobConf() so each > task > >> >> > gets its own JobConf object (and thus Configuration object). The > >> >> > optimization of broadcasting the Configuration object across the > >> cluster > >> >> > can remain, but on the other side I think it needs to be cloned for > >> each > >> >> > task to allow for concurrent access. I'm not sure the performance > >> >> > implications, but the comments suggest that the Configuration > object > >> is > >> >> > ~10KB so I would expect a clone on the object to be relatively > speedy. > >> >> > > >> >> > Has this been observed before? Does my suggested fix make sense? > >> I'd be > >> >> > happy to file a Jira ticket and continue discussion there for the > >> right > >> >> way > >> >> > to fix. > >> >> > > >> >> > > >> >> > Thanks! > >> >> > Andrew > >> >> > > >> >> > > >> >> > P.S. For others seeing this issue, our temporary workaround is to > >> enable > >> >> > spark.speculation, which retries failed (or hung) tasks on other > >> >> machines. > >> >> > > >> >> > > >> >> > > >> >> > "Executor task launch worker-6" daemon prio=10 > tid=0x00007f91f01fe000 > >> >> > nid=0x54b1 runnable [0x00007f92d74f1000] > >> >> > java.lang.Thread.State: RUNNABLE > >> >> > at java.util.HashMap.transfer(HashMap.java:601) > >> >> > at java.util.HashMap.resize(HashMap.java:581) > >> >> > at java.util.HashMap.addEntry(HashMap.java:879) > >> >> > at java.util.HashMap.put(HashMap.java:505) > >> >> > at > >> org.apache.hadoop.conf.Configuration.set(Configuration.java:803) > >> >> > at > >> org.apache.hadoop.conf.Configuration.set(Configuration.java:783) > >> >> > at > >> >> > > org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662) > >> >> > at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193) > >> >> > at > >> >> > > >> >> > >> > org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343) > >> >> > at > >> >> > > >> >> > >> > org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168) > >> >> > at > >> >> > > >> >> > >> > org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129) > >> >> > at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436) > >> >> > at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403) > >> >> > at > >> >> > > >> >> > >> > org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125) > >> >> > at > >> >> > > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262) > >> >> > at > org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86) > >> >> > at > >> >> > > >> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296) > >> >> > at > org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278) > >> >> > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316) > >> >> > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194) > >> >> > at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37) > >> >> > at > >> >> > > >> org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43) > >> >> > at > >> >> > > >> >> > >> > org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52) > >> >> > at > >> org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156) > >> >> > at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) > >> >> > at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) > >> >> > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) > >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) > >> >> > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > >> >> > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) > >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) > >> >> > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > >> >> > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) > >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) > >> >> > at > >> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) > >> >> > at org.apache.spark.scheduler.Task.run(Task.scala:53) > >> >> > at > >> >> > > >> >> > >> > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) > >> >> > at > >> >> > > >> >> > >> > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) > >> >> > at > >> >> > > >> >> > >> > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) > >> >> > at java.security.AccessController.doPrivileged(Native Method) > >> >> > at javax.security.auth.Subject.doAs(Subject.java:415) > >> >> > at > >> >> > > >> >> > >> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) > >> >> > at > >> >> > > >> >> > >> > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) > >> >> > at > >> >> > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) > >> >> > at > >> >> > > >> >> > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > >> >> > at > >> >> > > >> >> > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > >> >> > at java.lang.Thread.run(Thread.java:745) > >> >> > > >> >> > > >> >> > >> >