Anders,

This could be related to this open ticket:
https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce() also
fixed that for us as a stopgap.

Best,
-Sven


On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg <arp...@spotify.com> wrote:

> Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've
> actually been able to solve the problem finally, and it seems to be an
> issue with too many partitions. The repartitioning I tried initially did so
> after the union, and then it's too late. By repartitioning as early as
> possible, and significantly reducing number of partitions (going from
> 100,000+ to ~6,000 partitions), the job succeeds and no more "Error
> communicating with MapOutputTracker" issues. Seems like an issue with
> handling too many partitions and executors as the same time.
>
> Would be awesome with an "auto-repartition" function, that checks sizes of
> existing partitions and compares with the HDFS block size. If too small (or
> too large), it would repartition to partition sizes similar to the block
> size...
>
> Hope this help others with similar issues.
>
> Best,
> Anders
>
> On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza <sandy.r...@cloudera.com>
> wrote:
>
>> Hi Anders,
>>
>> Have you checked your NodeManager logs to make sure YARN isn't killing
>> executors for exceeding memory limits?
>>
>> -Sandy
>>
>> On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg <arp...@spotify.com> wrote:
>>
>>> Hey,
>>>
>>> I have a job that keeps failing if too much data is processed, and I
>>> can't see how to get it working. I've tried repartitioning with more
>>> partitions and increasing amount of memory for the executors (now about 12G
>>> and 400 executors. Here is a snippets of the first part of the code, which
>>> succeeds without any problems:
>>>
>>>     val all_days = sc.union(
>>>       ds.dateInterval(startDate, date).map(date =>
>>>         sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
>>>           .map(s => (
>>>             (s.getUsername, s.getTrackUri),
>>>             UserItemData(s.getUsername, s.getTrackUri,
>>>               build_vector1(date, s),
>>>               build_vector2(s))))
>>>       )
>>>     )
>>>       .reduceByKey(sum_vectors)
>>>
>>> I want to process 30 days of data or more, but am only able to process
>>> about 10 days. If having more days of data (lower startDate in code
>>> above), the union above succeeds but the code below fails with "Error
>>> communicating with MapOutputTracker" (see http://pastebin.com/fGDCXPkL
>>> for more detailed error messages). Here is a snippet of the code that fails:
>>>
>>>     val top_tracks = all_days.map(t => (t._1._2.toString, 1)).
>>> reduceByKey(_+_)
>>>       .filter(trackFilter)
>>>       .repartition(40000)
>>>       .persist(StorageLevel.MEMORY_AND_DISK_SER)
>>>
>>>     val observation_data = all_days
>>>       .mapPartitions(_.map(o => (o._1._2.toString, o._2)))
>>>       .join(top_tracks)
>>>
>>> The calculation of top_tracks works, but the last mapPartitions task
>>> fails with given error message if given more than 10 days of data. Also
>>> tried increasing the spark.akka.askTimeout setting, but it still fails
>>> even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
>>> and the kryo serialization.
>>>
>>> Realize that this is a rather long message, but I'm stuck and would
>>> appreciate any help or clues for resolving this issue. Seems to be a
>>> out-of-memory issue, but it does not seems to help to increase the number
>>> of partitions.
>>>
>>> Thanks,
>>> Anders
>>>
>>
>>
>


-- 
http://sites.google.com/site/krasser/?utm_source=sig

Reply via email to