Sounds very similar to what I experienced Corey. Something that seems to at
least help with my problems is to have more partitions. Am already fighting
between ending up with too many partitions in the end and having too few in
the beginning. By coalescing at late as possible and avoiding too few in
the beginning, the problems seems to decrease. Also, increasing
spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
significantly (~700 secs), the problems seems to almost disappear. Don't
wont to celebrate yet, still long way left before the job complete but it's
looking better...

On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet <cjno...@gmail.com> wrote:

> I'm looking @ my yarn container logs for some of the executors which
> appear to be failing (with the missing shuffle files). I see exceptions
> that say "client.TransportClientFactor: Found inactive connection to
> host/ip:port, closing it."
>
> Right after that I see "shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
> connect to host/ip:port"
>
> Right after that exception I see "RECEIVED SIGNAL 15: SIGTERM"
>
> Finally, following the sigterm, I see "FileNotFoundExcception:
> /hdfs/01/yarn/nm/usercache....../spark-local-uuid/shuffle_5_09_0.data (No
> such file for directory)"
>
> I'm looking @ the nodemanager and application master logs and I see no
> indications whatsoever that there were any memory issues during this period
> of time. The Spark UI is telling me none of the executors are really using
> too much memory when this happens. It is a big job that's catching several
> 100's of GB but each node manager on the cluster has 64gb of ram just for
> yarn containers (physical nodes have 128gb). On this cluster, we have 128
> nodes. I've also tried using DISK_ONLY storage level but to no avail.
>
> Any further ideas on how to track this down? Again, we're able to run this
> same job on about 1/5th of the data just fine.The only thing that's
> pointing me towards a memory issue is that it seems to be happening in the
> same stages each time and when I lower the memory that each executor has
> allocated it happens in earlier stages but I can't seem to find anything
> that says an executor (or container for that matter) has run low on memory.
>
>
>
> On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg <arp...@spotify.com> wrote:
>
>> No, unfortunately we're not making use of dynamic allocation or the
>> external shuffle service. Hoping that we could reconfigure our cluster to
>> make use of it, but since it requires changes to the cluster itself (and
>> not just the Spark app), it could take some time.
>>
>> Unsure if task 450 was acting as a reducer or not, but seems possible.
>> Probably due to a crashed executor as you say. Seems like I need to do some
>> more advanced partition tuning to make this job work, as it's currently
>> rather high number of partitions.
>>
>> Thanks for the help so far! It's certainly a frustrating task to debug
>> when everything's working perfectly on sample data locally and crashes hard
>> when running on the full dataset on the cluster...
>>
>> On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui <same...@databricks.com>
>> wrote:
>>
>>> Do you guys have dynamic allocation turned on for YARN?
>>>
>>> Anders, was Task 450 in your job acting like a Reducer and fetching the
>>> Map spill output data from a different node?
>>>
>>> If a Reducer task can't read the remote data it needs, that could cause
>>> the stage to fail. Sometimes this forces the previous stage to also be
>>> re-computed if it's a wide dependency.
>>>
>>> But like Petar said, if you turn the external shuffle service on, YARN
>>> NodeManager process on the slave machines will serve out the map spill
>>> data, instead of the Executor JVMs (by default unless you turn external
>>> shuffle on, the Executor JVM itself serves out the shuffle data which
>>> causes problems if an Executor dies).
>>>
>>> Core, how often are Executors crashing in your app? How many Executors
>>> do you have total? And what is the memory size for each? You can change
>>> what fraction of the Executor heap will be used for your user code vs the
>>> shuffle vs RDD caching with the spark.storage.memoryFraction setting.
>>>
>>> On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic <petar.zece...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Could you try to turn on the external shuffle service?
>>>>
>>>> spark.shuffle.service.enable = true
>>>>
>>>>
>>>> On 21.2.2015. 17:50, Corey Nolet wrote:
>>>>
>>>> I'm experiencing the same issue. Upon closer inspection I'm noticing
>>>> that executors are being lost as well. Thing is, I can't figure out how
>>>> they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of
>>>> memory allocated for the application. I was thinking perhaps it was
>>>> possible that a single executor was getting a single or a couple large
>>>> partitions but shouldn't the disk persistence kick in at that point?
>>>>
>>>> On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg <arp...@spotify.com>
>>>> wrote:
>>>>
>>>>> For large jobs, the following error message is shown that seems to
>>>>> indicate that shuffle files for some reason are missing. It's a rather
>>>>> large job with many partitions. If the data size is reduced, the problem
>>>>> disappears. I'm running a build from Spark master post 1.2 (build at
>>>>> 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
>>>>> problem?
>>>>>
>>>>>  User class threw exception: Job aborted due to stage failure: Task
>>>>> 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in
>>>>> stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
>>>>> java.io.FileNotFoundException:
>>>>> /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
>>>>> (No such file or directory)
>>>>>  at java.io.FileOutputStream.open(Native Method)
>>>>>  at java.io.FileOutputStream.(FileOutputStream.java:221)
>>>>>  at java.io.FileOutputStream.(FileOutputStream.java:171)
>>>>>  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
>>>>>  at
>>>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
>>>>>  at
>>>>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
>>>>>  at
>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
>>>>>  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
>>>>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>>>>  at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
>>>>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
>>>>>  at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>  at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>  at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>  at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
>>>>>  at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>
>>>>>  at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>
>>>>>  at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>>  TIA,
>>>>> Anders
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to