Hi Anders, Have you checked your NodeManager logs to make sure YARN isn't killing executors for exceeding memory limits?
-Sandy On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg <arp...@spotify.com> wrote: > Hey, > > I have a job that keeps failing if too much data is processed, and I can't > see how to get it working. I've tried repartitioning with more partitions > and increasing amount of memory for the executors (now about 12G and 400 > executors. Here is a snippets of the first part of the code, which succeeds > without any problems: > > val all_days = sc.union( > ds.dateInterval(startDate, date).map(date => > sc.avroFile[LrDailyEndSong](daily_end_song_path + date) > .map(s => ( > (s.getUsername, s.getTrackUri), > UserItemData(s.getUsername, s.getTrackUri, > build_vector1(date, s), > build_vector2(s)))) > ) > ) > .reduceByKey(sum_vectors) > > I want to process 30 days of data or more, but am only able to process > about 10 days. If having more days of data (lower startDate in code > above), the union above succeeds but the code below fails with "Error > communicating with MapOutputTracker" (see http://pastebin.com/fGDCXPkL > for more detailed error messages). Here is a snippet of the code that fails: > > val top_tracks = all_days.map(t => (t._1._2.toString, 1)).reduceByKey > (_+_) > .filter(trackFilter) > .repartition(40000) > .persist(StorageLevel.MEMORY_AND_DISK_SER) > > val observation_data = all_days > .mapPartitions(_.map(o => (o._1._2.toString, o._2))) > .join(top_tracks) > > The calculation of top_tracks works, but the last mapPartitions task > fails with given error message if given more than 10 days of data. Also > tried increasing the spark.akka.askTimeout setting, but it still fails > even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 > and the kryo serialization. > > Realize that this is a rather long message, but I'm stuck and would > appreciate any help or clues for resolving this issue. Seems to be a > out-of-memory issue, but it does not seems to help to increase the number > of partitions. > > Thanks, > Anders >