I *think* this may have been related to the default memory overhead setting being too low. I raised the value to 1G it and tried my job again but i had to leave the office before it finished. It did get further but I'm not exactly sure if that's just because i raised the memory. I'll see tomorrow- but i have a suspicion this may have been the cause of the executors being killed by the application master. On Feb 23, 2015 5:25 PM, "Corey Nolet" <cjno...@gmail.com> wrote:
> I've got the opposite problem with regards to partitioning. I've got over > 6000 partitions for some of these RDDs which immediately blows the heap > somehow- I'm still not exactly sure how. If I coalesce them down to about > 600-800 partitions, I get the problems where the executors are dying > without any other error messages (other than telling me the executor was > lost in the UI). If I don't coalesce, I pretty immediately get Java heap > space exceptions that kill the job altogether. > > Putting in the timeouts didn't seem to help the case where I am > coalescing. Also, I don't see any dfferences between 'disk only' and > 'memory and disk' storage levels- both of them are having the same > problems. I notice large shuffle files (30-40gb) that only seem to spill a > few hundred mb. > > On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg <arp...@spotify.com> wrote: > >> Sounds very similar to what I experienced Corey. Something that seems to >> at least help with my problems is to have more partitions. Am already >> fighting between ending up with too many partitions in the end and having >> too few in the beginning. By coalescing at late as possible and avoiding >> too few in the beginning, the problems seems to decrease. Also, increasing >> spark.akka.askTimeout and spark.core.connection.ack.wait.timeout >> significantly (~700 secs), the problems seems to almost disappear. Don't >> wont to celebrate yet, still long way left before the job complete but it's >> looking better... >> >> On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet <cjno...@gmail.com> wrote: >> >>> I'm looking @ my yarn container logs for some of the executors which >>> appear to be failing (with the missing shuffle files). I see exceptions >>> that say "client.TransportClientFactor: Found inactive connection to >>> host/ip:port, closing it." >>> >>> Right after that I see "shuffle.RetryingBlockFetcher: Exception while >>> beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to >>> connect to host/ip:port" >>> >>> Right after that exception I see "RECEIVED SIGNAL 15: SIGTERM" >>> >>> Finally, following the sigterm, I see "FileNotFoundExcception: >>> /hdfs/01/yarn/nm/usercache....../spark-local-uuid/shuffle_5_09_0.data (No >>> such file for directory)" >>> >>> I'm looking @ the nodemanager and application master logs and I see no >>> indications whatsoever that there were any memory issues during this period >>> of time. The Spark UI is telling me none of the executors are really using >>> too much memory when this happens. It is a big job that's catching several >>> 100's of GB but each node manager on the cluster has 64gb of ram just for >>> yarn containers (physical nodes have 128gb). On this cluster, we have 128 >>> nodes. I've also tried using DISK_ONLY storage level but to no avail. >>> >>> Any further ideas on how to track this down? Again, we're able to run >>> this same job on about 1/5th of the data just fine.The only thing that's >>> pointing me towards a memory issue is that it seems to be happening in the >>> same stages each time and when I lower the memory that each executor has >>> allocated it happens in earlier stages but I can't seem to find anything >>> that says an executor (or container for that matter) has run low on memory. >>> >>> >>> >>> On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg <arp...@spotify.com> >>> wrote: >>> >>>> No, unfortunately we're not making use of dynamic allocation or the >>>> external shuffle service. Hoping that we could reconfigure our cluster to >>>> make use of it, but since it requires changes to the cluster itself (and >>>> not just the Spark app), it could take some time. >>>> >>>> Unsure if task 450 was acting as a reducer or not, but seems possible. >>>> Probably due to a crashed executor as you say. Seems like I need to do some >>>> more advanced partition tuning to make this job work, as it's currently >>>> rather high number of partitions. >>>> >>>> Thanks for the help so far! It's certainly a frustrating task to debug >>>> when everything's working perfectly on sample data locally and crashes hard >>>> when running on the full dataset on the cluster... >>>> >>>> On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui < >>>> same...@databricks.com> wrote: >>>> >>>>> Do you guys have dynamic allocation turned on for YARN? >>>>> >>>>> Anders, was Task 450 in your job acting like a Reducer and fetching >>>>> the Map spill output data from a different node? >>>>> >>>>> If a Reducer task can't read the remote data it needs, that could >>>>> cause the stage to fail. Sometimes this forces the previous stage to also >>>>> be re-computed if it's a wide dependency. >>>>> >>>>> But like Petar said, if you turn the external shuffle service on, YARN >>>>> NodeManager process on the slave machines will serve out the map spill >>>>> data, instead of the Executor JVMs (by default unless you turn external >>>>> shuffle on, the Executor JVM itself serves out the shuffle data which >>>>> causes problems if an Executor dies). >>>>> >>>>> Core, how often are Executors crashing in your app? How many Executors >>>>> do you have total? And what is the memory size for each? You can change >>>>> what fraction of the Executor heap will be used for your user code vs the >>>>> shuffle vs RDD caching with the spark.storage.memoryFraction setting. >>>>> >>>>> On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic < >>>>> petar.zece...@gmail.com> wrote: >>>>> >>>>>> >>>>>> Could you try to turn on the external shuffle service? >>>>>> >>>>>> spark.shuffle.service.enable = true >>>>>> >>>>>> >>>>>> On 21.2.2015. 17:50, Corey Nolet wrote: >>>>>> >>>>>> I'm experiencing the same issue. Upon closer inspection I'm noticing >>>>>> that executors are being lost as well. Thing is, I can't figure out how >>>>>> they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of >>>>>> memory allocated for the application. I was thinking perhaps it was >>>>>> possible that a single executor was getting a single or a couple large >>>>>> partitions but shouldn't the disk persistence kick in at that point? >>>>>> >>>>>> On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg <arp...@spotify.com> >>>>>> wrote: >>>>>> >>>>>>> For large jobs, the following error message is shown that seems to >>>>>>> indicate that shuffle files for some reason are missing. It's a rather >>>>>>> large job with many partitions. If the data size is reduced, the problem >>>>>>> disappears. I'm running a build from Spark master post 1.2 (build at >>>>>>> 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this >>>>>>> problem? >>>>>>> >>>>>>> User class threw exception: Job aborted due to stage failure: Task >>>>>>> 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 >>>>>>> in >>>>>>> stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): >>>>>>> java.io.FileNotFoundException: >>>>>>> /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 >>>>>>> (No such file or directory) >>>>>>> at java.io.FileOutputStream.open(Native Method) >>>>>>> at java.io.FileOutputStream.(FileOutputStream.java:221) >>>>>>> at java.io.FileOutputStream.(FileOutputStream.java:171) >>>>>>> at >>>>>>> org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) >>>>>>> at >>>>>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) >>>>>>> at >>>>>>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) >>>>>>> at >>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) >>>>>>> at >>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) >>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >>>>>>> at >>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) >>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) >>>>>>> at >>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>>>> at >>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>>>>>> at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>> >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> TIA, >>>>>>> Anders >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >