Hi Ryan,

This is an issue from sort-based shuffle, not consolidated hash-based shuffle. 
I guess mostly this issue occurs when Spark cluster is in abnormal situation, 
maybe long time of GC pause or some others, you can check the system status or 
if there’s any other exceptions beside this one.

Thanks
Jerry

From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of 
Ryan Williams
Sent: Wednesday, October 29, 2014 1:31 PM
To: user
Subject: FileNotFoundException in appcache shuffle files

My job is failing with the following error:
14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 
(TID 6266, 
demeter-csmau08-19.demeter.hpc.mssm.edu<http://demeter-csmau08-19.demeter.hpc.mssm.edu>):
 java.io.FileNotFoundException: 
/data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index
 (No such file or directory)
        java.io.FileOutputStream.open(Native Method)
        java.io.FileOutputStream.<init>(FileOutputStream.java:221)
        
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
        
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)
        
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733)
        
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        
org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790)
        
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732)
        
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728)
        
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        org.apache.spark.scheduler.Task.run(Task.scala:56)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:744)

I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 
task-1543 failures are a few instances of this failure on another task. Here is 
the entire App Master stdout 
dump<https://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0>[1] (~2MB; stack 
traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}.

Here's a summary of the RDD manipulations I've done up to the point of failure:

  *   val A = [read a file in 1419 shards]

     *   the file is 177GB compressed but ends up being ~5TB uncompressed / 
hydrated into scala objects (I think; see below for more discussion on this 
point).
     *   some relevant Spark options:

        *   spark.default.parallelism=2000
        *   --master yarn-client
        *   --executor-memory 50g
        *   --driver-memory 10g
        *   --num-executors 100
        *   --executor-cores 4

  *   A.repartition(3000)

     *   3000 was chosen in an attempt to mitigate shuffle-disk-spillage that 
previous job attempts with 1000 or 1419 shards were mired in

  *   A.persist()

  *   A.count()  // succeeds

     *   screenshot of web UI with stats: http://cl.ly/image/3e130w3J1B2v
     *   I don't know why each task reports "8 TB" of "Input"; that metric 
seems like it is always ludicrously high and I don't pay attention to it 
typically.
     *   Each task shuffle-writes 3.5GB, for a total of 4.9TB

        *   Does that mean that 4.9TB is the uncompressed size of the file that 
A was read from?
        *   4.9TB is pretty close to the total amount of memory I've configured 
the job to use: (50GB/executor) * (100 executors) ~= 5TB.
        *   Is that a coincidence, or are my executors shuffle-writing an 
amount equal to all of their memory for some reason?

  *   val B = A.groupBy(...).filter(_._2.size == 2).map(_._2).flatMap(x => 
x).persist()

     *   my expectation is that ~all elements pass the filter step, so B should 
~equal to A, just to give a sense of the expected memory blowup.

  *   B.count()

     *   this fails while executing .groupBy(...) above

I've found a few discussions of issues whose manifestations look *like* this, 
but nothing that is obviously the same issue. The closest hit I've seen is 
"Stage failure in 
BlockManager...<http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3ccangvg8qtk57frws+kaqtiuz9jsls5qjkxxjxttq9eh2-gsr...@mail.gmail.com%3E>"[2]
 on this list on 8/20; some key excerpts:

  *   "likely due to a bug in shuffle file consolidation"
  *   "hopefully fixed in 1.1 with this patch: 
https://github.com/apache/spark/commit/78f2af582286b81e6dc9fa9d455ed2b369d933bd";

     *   78f2af5<https://github.com/apache/spark/commit/78f2af5>[3] implements 
pieces of #1609<https://github.com/apache/spark/pull/1609>[4], on which mridulm 
has a 
comment<https://github.com/apache/spark/pull/1609#issuecomment-54393908>[5] 
saying: "it got split into four issues, two of which got committed, not sure of 
the other other two .... And the first one was regressed upon in 1.1.already."

  *   "Until 1.0.3 or 1.1 are released, the simplest solution is to disable 
spark.shuffle.consolidateFiles."

     *   I've not tried this yet as I'm waiting on a re-run with some other 
parameters tweaked first.
     *   Also, I can't tell if it's expected that this was fixed, known that it 
subsequently regressed, etc., so hoping for some guidance there.
So! Anyone else seen this? Is this related to the "bug in shuffle file 
consolidation"? Was it fixed? Did it regress? Are my confs or other steps 
unreasonable in some way? Any assistance would be appreciated, thanks.

-Ryan


[1] https://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0
[2] 
http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3ccangvg8qtk57frws+kaqtiuz9jsls5qjkxxjxttq9eh2-gsr...@mail.gmail.com%3E
[3] https://github.com/apache/spark/commit/78f2af5
[4] https://github.com/apache/spark/pull/1609
[5] https://github.com/apache/spark/pull/1609#issuecomment-54393908


Reply via email to