If you thinking of the yarn memory overhead, then yes, I have increased
that as well. However, I'm glad to say that my job finished successfully
finally. Besides the timeout and memory settings, performing repartitioning
(with shuffling) at the right time seems to be the key to make this large
job succeed. With all the transformations in the job, the partition
distribution was becoming increasingly skewed. Not easy to figure out when
and to what number of partitions to set, and takes forever to tweak these
settings since it's works perfectly for small datasets and you'll have to
experiment with large time-consuming jobs. Imagine if there was an
automatic partition reconfiguration function that automagically did that...

On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet <cjno...@gmail.com> wrote:

> I *think* this may have been related to the default memory overhead
> setting being too low. I raised the value to 1G it and tried my job again
> but i had to leave the office before it finished. It did get further but
> I'm not exactly sure if that's just because i raised the memory. I'll see
> tomorrow- but i have a suspicion this may have been the cause of the
> executors being killed by the application master.
> On Feb 23, 2015 5:25 PM, "Corey Nolet" <cjno...@gmail.com> wrote:
>
>> I've got the opposite problem with regards to partitioning. I've got over
>> 6000 partitions for some of these RDDs which immediately blows the heap
>> somehow- I'm still not exactly sure how. If I coalesce them down to about
>> 600-800 partitions, I get the problems where the executors are dying
>> without any other error messages (other than telling me the executor was
>> lost in the UI). If I don't coalesce, I pretty immediately get Java heap
>> space exceptions that kill the job altogether.
>>
>> Putting in the timeouts didn't seem to help the case where I am
>> coalescing. Also, I don't see any dfferences between 'disk only' and
>> 'memory and disk' storage levels- both of them are having the same
>> problems. I notice large shuffle files (30-40gb) that only seem to spill a
>> few hundred mb.
>>
>> On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg <arp...@spotify.com>
>> wrote:
>>
>>> Sounds very similar to what I experienced Corey. Something that seems to
>>> at least help with my problems is to have more partitions. Am already
>>> fighting between ending up with too many partitions in the end and having
>>> too few in the beginning. By coalescing at late as possible and avoiding
>>> too few in the beginning, the problems seems to decrease. Also, increasing
>>> spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
>>> significantly (~700 secs), the problems seems to almost disappear. Don't
>>> wont to celebrate yet, still long way left before the job complete but it's
>>> looking better...
>>>
>>> On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet <cjno...@gmail.com> wrote:
>>>
>>>> I'm looking @ my yarn container logs for some of the executors which
>>>> appear to be failing (with the missing shuffle files). I see exceptions
>>>> that say "client.TransportClientFactor: Found inactive connection to
>>>> host/ip:port, closing it."
>>>>
>>>> Right after that I see "shuffle.RetryingBlockFetcher: Exception while
>>>> beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
>>>> connect to host/ip:port"
>>>>
>>>> Right after that exception I see "RECEIVED SIGNAL 15: SIGTERM"
>>>>
>>>> Finally, following the sigterm, I see "FileNotFoundExcception:
>>>> /hdfs/01/yarn/nm/usercache....../spark-local-uuid/shuffle_5_09_0.data (No
>>>> such file for directory)"
>>>>
>>>> I'm looking @ the nodemanager and application master logs and I see no
>>>> indications whatsoever that there were any memory issues during this period
>>>> of time. The Spark UI is telling me none of the executors are really using
>>>> too much memory when this happens. It is a big job that's catching several
>>>> 100's of GB but each node manager on the cluster has 64gb of ram just for
>>>> yarn containers (physical nodes have 128gb). On this cluster, we have 128
>>>> nodes. I've also tried using DISK_ONLY storage level but to no avail.
>>>>
>>>> Any further ideas on how to track this down? Again, we're able to run
>>>> this same job on about 1/5th of the data just fine.The only thing that's
>>>> pointing me towards a memory issue is that it seems to be happening in the
>>>> same stages each time and when I lower the memory that each executor has
>>>> allocated it happens in earlier stages but I can't seem to find anything
>>>> that says an executor (or container for that matter) has run low on memory.
>>>>
>>>>
>>>>
>>>> On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg <arp...@spotify.com>
>>>> wrote:
>>>>
>>>>> No, unfortunately we're not making use of dynamic allocation or the
>>>>> external shuffle service. Hoping that we could reconfigure our cluster to
>>>>> make use of it, but since it requires changes to the cluster itself (and
>>>>> not just the Spark app), it could take some time.
>>>>>
>>>>> Unsure if task 450 was acting as a reducer or not, but seems possible.
>>>>> Probably due to a crashed executor as you say. Seems like I need to do 
>>>>> some
>>>>> more advanced partition tuning to make this job work, as it's currently
>>>>> rather high number of partitions.
>>>>>
>>>>> Thanks for the help so far! It's certainly a frustrating task to debug
>>>>> when everything's working perfectly on sample data locally and crashes 
>>>>> hard
>>>>> when running on the full dataset on the cluster...
>>>>>
>>>>> On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui <
>>>>> same...@databricks.com> wrote:
>>>>>
>>>>>> Do you guys have dynamic allocation turned on for YARN?
>>>>>>
>>>>>> Anders, was Task 450 in your job acting like a Reducer and fetching
>>>>>> the Map spill output data from a different node?
>>>>>>
>>>>>> If a Reducer task can't read the remote data it needs, that could
>>>>>> cause the stage to fail. Sometimes this forces the previous stage to also
>>>>>> be re-computed if it's a wide dependency.
>>>>>>
>>>>>> But like Petar said, if you turn the external shuffle service on,
>>>>>> YARN NodeManager process on the slave machines will serve out the map 
>>>>>> spill
>>>>>> data, instead of the Executor JVMs (by default unless you turn external
>>>>>> shuffle on, the Executor JVM itself serves out the shuffle data which
>>>>>> causes problems if an Executor dies).
>>>>>>
>>>>>> Core, how often are Executors crashing in your app? How many
>>>>>> Executors do you have total? And what is the memory size for each? You 
>>>>>> can
>>>>>> change what fraction of the Executor heap will be used for your user code
>>>>>> vs the shuffle vs RDD caching with the spark.storage.memoryFraction 
>>>>>> setting.
>>>>>>
>>>>>> On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic <
>>>>>> petar.zece...@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> Could you try to turn on the external shuffle service?
>>>>>>>
>>>>>>> spark.shuffle.service.enable = true
>>>>>>>
>>>>>>>
>>>>>>> On 21.2.2015. 17:50, Corey Nolet wrote:
>>>>>>>
>>>>>>> I'm experiencing the same issue. Upon closer inspection I'm noticing
>>>>>>> that executors are being lost as well. Thing is, I can't figure out how
>>>>>>> they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of
>>>>>>> memory allocated for the application. I was thinking perhaps it was
>>>>>>> possible that a single executor was getting a single or a couple large
>>>>>>> partitions but shouldn't the disk persistence kick in at that point?
>>>>>>>
>>>>>>> On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg <arp...@spotify.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> For large jobs, the following error message is shown that seems to
>>>>>>>> indicate that shuffle files for some reason are missing. It's a rather
>>>>>>>> large job with many partitions. If the data size is reduced, the 
>>>>>>>> problem
>>>>>>>> disappears. I'm running a build from Spark master post 1.2 (build at
>>>>>>>> 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
>>>>>>>> problem?
>>>>>>>>
>>>>>>>>  User class threw exception: Job aborted due to stage failure: Task
>>>>>>>> 450 in stage 450.1 failed 4 times, most recent failure: Lost task 
>>>>>>>> 450.3 in
>>>>>>>> stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
>>>>>>>> java.io.FileNotFoundException:
>>>>>>>> /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
>>>>>>>> (No such file or directory)
>>>>>>>>  at java.io.FileOutputStream.open(Native Method)
>>>>>>>>  at java.io.FileOutputStream.(FileOutputStream.java:221)
>>>>>>>>  at java.io.FileOutputStream.(FileOutputStream.java:171)
>>>>>>>>  at
>>>>>>>> org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
>>>>>>>>  at
>>>>>>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
>>>>>>>>  at
>>>>>>>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
>>>>>>>>  at
>>>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
>>>>>>>>  at
>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
>>>>>>>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>>>>>>>  at
>>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
>>>>>>>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
>>>>>>>>  at
>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>>>  at
>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>>>  at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>>  at
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
>>>>>>>>  at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>
>>>>>>>>  at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>
>>>>>>>>  at java.lang.Thread.run(Thread.java:745)
>>>>>>>>
>>>>>>>>  TIA,
>>>>>>>> Anders
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to