Anders, This could be related to this open ticket: https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce() also fixed that for us as a stopgap.
Best, -Sven On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg <arp...@spotify.com> wrote: > Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've > actually been able to solve the problem finally, and it seems to be an > issue with too many partitions. The repartitioning I tried initially did so > after the union, and then it's too late. By repartitioning as early as > possible, and significantly reducing number of partitions (going from > 100,000+ to ~6,000 partitions), the job succeeds and no more "Error > communicating with MapOutputTracker" issues. Seems like an issue with > handling too many partitions and executors as the same time. > > Would be awesome with an "auto-repartition" function, that checks sizes of > existing partitions and compares with the HDFS block size. If too small (or > too large), it would repartition to partition sizes similar to the block > size... > > Hope this help others with similar issues. > > Best, > Anders > > On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza <sandy.r...@cloudera.com> > wrote: > >> Hi Anders, >> >> Have you checked your NodeManager logs to make sure YARN isn't killing >> executors for exceeding memory limits? >> >> -Sandy >> >> On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg <arp...@spotify.com> wrote: >> >>> Hey, >>> >>> I have a job that keeps failing if too much data is processed, and I >>> can't see how to get it working. I've tried repartitioning with more >>> partitions and increasing amount of memory for the executors (now about 12G >>> and 400 executors. Here is a snippets of the first part of the code, which >>> succeeds without any problems: >>> >>> val all_days = sc.union( >>> ds.dateInterval(startDate, date).map(date => >>> sc.avroFile[LrDailyEndSong](daily_end_song_path + date) >>> .map(s => ( >>> (s.getUsername, s.getTrackUri), >>> UserItemData(s.getUsername, s.getTrackUri, >>> build_vector1(date, s), >>> build_vector2(s)))) >>> ) >>> ) >>> .reduceByKey(sum_vectors) >>> >>> I want to process 30 days of data or more, but am only able to process >>> about 10 days. If having more days of data (lower startDate in code >>> above), the union above succeeds but the code below fails with "Error >>> communicating with MapOutputTracker" (see http://pastebin.com/fGDCXPkL >>> for more detailed error messages). Here is a snippet of the code that fails: >>> >>> val top_tracks = all_days.map(t => (t._1._2.toString, 1)). >>> reduceByKey(_+_) >>> .filter(trackFilter) >>> .repartition(40000) >>> .persist(StorageLevel.MEMORY_AND_DISK_SER) >>> >>> val observation_data = all_days >>> .mapPartitions(_.map(o => (o._1._2.toString, o._2))) >>> .join(top_tracks) >>> >>> The calculation of top_tracks works, but the last mapPartitions task >>> fails with given error message if given more than 10 days of data. Also >>> tried increasing the spark.akka.askTimeout setting, but it still fails >>> even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 >>> and the kryo serialization. >>> >>> Realize that this is a rather long message, but I'm stuck and would >>> appreciate any help or clues for resolving this issue. Seems to be a >>> out-of-memory issue, but it does not seems to help to increase the number >>> of partitions. >>> >>> Thanks, >>> Anders >>> >> >> > -- http://sites.google.com/site/krasser/?utm_source=sig