Re: Trouble with large Yarn job

2015-01-14 Thread Anders Arpteg
Interesting, sounds plausible. Another way to avoid the problem has been to
cache intermediate output for large jobs (i.e. split large jobs into
smaller and then union together) Unfortunately that this type of tweaking
should be necessary though, hopefully better in 1.2.1.

On Tue, Jan 13, 2015 at 3:29 AM, Sven Krasser  wrote:

> Anders,
>
> This could be related to this open ticket:
> https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce()
> also fixed that for us as a stopgap.
>
> Best,
> -Sven
>
>
> On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg 
> wrote:
>
>> Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've
>> actually been able to solve the problem finally, and it seems to be an
>> issue with too many partitions. The repartitioning I tried initially did so
>> after the union, and then it's too late. By repartitioning as early as
>> possible, and significantly reducing number of partitions (going from
>> 100,000+ to ~6,000 partitions), the job succeeds and no more "Error
>> communicating with MapOutputTracker" issues. Seems like an issue with
>> handling too many partitions and executors as the same time.
>>
>> Would be awesome with an "auto-repartition" function, that checks sizes
>> of existing partitions and compares with the HDFS block size. If too small
>> (or too large), it would repartition to partition sizes similar to the
>> block size...
>>
>> Hope this help others with similar issues.
>>
>> Best,
>> Anders
>>
>> On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza 
>> wrote:
>>
>>> Hi Anders,
>>>
>>> Have you checked your NodeManager logs to make sure YARN isn't killing
>>> executors for exceeding memory limits?
>>>
>>> -Sandy
>>>
>>> On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg 
>>> wrote:
>>>
 Hey,

 I have a job that keeps failing if too much data is processed, and I
 can't see how to get it working. I've tried repartitioning with more
 partitions and increasing amount of memory for the executors (now about 12G
 and 400 executors. Here is a snippets of the first part of the code, which
 succeeds without any problems:

 val all_days = sc.union(
   ds.dateInterval(startDate, date).map(date =>
 sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
   .map(s => (
 (s.getUsername, s.getTrackUri),
 UserItemData(s.getUsername, s.getTrackUri,
   build_vector1(date, s),
   build_vector2(s
   )
 )
   .reduceByKey(sum_vectors)

 I want to process 30 days of data or more, but am only able to process
 about 10 days. If having more days of data (lower startDate in code
 above), the union above succeeds but the code below fails with "Error
 communicating with MapOutputTracker" (see http://pastebin.com/fGDCXPkL
 for more detailed error messages). Here is a snippet of the code that 
 fails:

 val top_tracks = all_days.map(t => (t._1._2.toString, 1)).
 reduceByKey(_+_)
   .filter(trackFilter)
   .repartition(4)
   .persist(StorageLevel.MEMORY_AND_DISK_SER)

 val observation_data = all_days
   .mapPartitions(_.map(o => (o._1._2.toString, o._2)))
   .join(top_tracks)

 The calculation of top_tracks works, but the last mapPartitions task
 fails with given error message if given more than 10 days of data. Also
 tried increasing the spark.akka.askTimeout setting, but it still fails
 even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
 and the kryo serialization.

 Realize that this is a rather long message, but I'm stuck and would
 appreciate any help or clues for resolving this issue. Seems to be a
 out-of-memory issue, but it does not seems to help to increase the number
 of partitions.

 Thanks,
 Anders

>>>
>>>
>>
>
>
> --
> http://sites.google.com/site/krasser/?utm_source=sig
>


Re: Trouble with large Yarn job

2015-01-12 Thread Sven Krasser
Anders,

This could be related to this open ticket:
https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce() also
fixed that for us as a stopgap.

Best,
-Sven


On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg  wrote:

> Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've
> actually been able to solve the problem finally, and it seems to be an
> issue with too many partitions. The repartitioning I tried initially did so
> after the union, and then it's too late. By repartitioning as early as
> possible, and significantly reducing number of partitions (going from
> 100,000+ to ~6,000 partitions), the job succeeds and no more "Error
> communicating with MapOutputTracker" issues. Seems like an issue with
> handling too many partitions and executors as the same time.
>
> Would be awesome with an "auto-repartition" function, that checks sizes of
> existing partitions and compares with the HDFS block size. If too small (or
> too large), it would repartition to partition sizes similar to the block
> size...
>
> Hope this help others with similar issues.
>
> Best,
> Anders
>
> On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza 
> wrote:
>
>> Hi Anders,
>>
>> Have you checked your NodeManager logs to make sure YARN isn't killing
>> executors for exceeding memory limits?
>>
>> -Sandy
>>
>> On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg  wrote:
>>
>>> Hey,
>>>
>>> I have a job that keeps failing if too much data is processed, and I
>>> can't see how to get it working. I've tried repartitioning with more
>>> partitions and increasing amount of memory for the executors (now about 12G
>>> and 400 executors. Here is a snippets of the first part of the code, which
>>> succeeds without any problems:
>>>
>>> val all_days = sc.union(
>>>   ds.dateInterval(startDate, date).map(date =>
>>> sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
>>>   .map(s => (
>>> (s.getUsername, s.getTrackUri),
>>> UserItemData(s.getUsername, s.getTrackUri,
>>>   build_vector1(date, s),
>>>   build_vector2(s
>>>   )
>>> )
>>>   .reduceByKey(sum_vectors)
>>>
>>> I want to process 30 days of data or more, but am only able to process
>>> about 10 days. If having more days of data (lower startDate in code
>>> above), the union above succeeds but the code below fails with "Error
>>> communicating with MapOutputTracker" (see http://pastebin.com/fGDCXPkL
>>> for more detailed error messages). Here is a snippet of the code that fails:
>>>
>>> val top_tracks = all_days.map(t => (t._1._2.toString, 1)).
>>> reduceByKey(_+_)
>>>   .filter(trackFilter)
>>>   .repartition(4)
>>>   .persist(StorageLevel.MEMORY_AND_DISK_SER)
>>>
>>> val observation_data = all_days
>>>   .mapPartitions(_.map(o => (o._1._2.toString, o._2)))
>>>   .join(top_tracks)
>>>
>>> The calculation of top_tracks works, but the last mapPartitions task
>>> fails with given error message if given more than 10 days of data. Also
>>> tried increasing the spark.akka.askTimeout setting, but it still fails
>>> even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
>>> and the kryo serialization.
>>>
>>> Realize that this is a rather long message, but I'm stuck and would
>>> appreciate any help or clues for resolving this issue. Seems to be a
>>> out-of-memory issue, but it does not seems to help to increase the number
>>> of partitions.
>>>
>>> Thanks,
>>> Anders
>>>
>>
>>
>


-- 
http://sites.google.com/site/krasser/?utm_source=sig


Re: Trouble with large Yarn job

2015-01-12 Thread Anders Arpteg
Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've
actually been able to solve the problem finally, and it seems to be an
issue with too many partitions. The repartitioning I tried initially did so
after the union, and then it's too late. By repartitioning as early as
possible, and significantly reducing number of partitions (going from
100,000+ to ~6,000 partitions), the job succeeds and no more "Error
communicating with MapOutputTracker" issues. Seems like an issue with
handling too many partitions and executors as the same time.

Would be awesome with an "auto-repartition" function, that checks sizes of
existing partitions and compares with the HDFS block size. If too small (or
too large), it would repartition to partition sizes similar to the block
size...

Hope this help others with similar issues.

Best,
Anders

On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza  wrote:

> Hi Anders,
>
> Have you checked your NodeManager logs to make sure YARN isn't killing
> executors for exceeding memory limits?
>
> -Sandy
>
> On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg  wrote:
>
>> Hey,
>>
>> I have a job that keeps failing if too much data is processed, and I
>> can't see how to get it working. I've tried repartitioning with more
>> partitions and increasing amount of memory for the executors (now about 12G
>> and 400 executors. Here is a snippets of the first part of the code, which
>> succeeds without any problems:
>>
>> val all_days = sc.union(
>>   ds.dateInterval(startDate, date).map(date =>
>> sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
>>   .map(s => (
>> (s.getUsername, s.getTrackUri),
>> UserItemData(s.getUsername, s.getTrackUri,
>>   build_vector1(date, s),
>>   build_vector2(s
>>   )
>> )
>>   .reduceByKey(sum_vectors)
>>
>> I want to process 30 days of data or more, but am only able to process
>> about 10 days. If having more days of data (lower startDate in code
>> above), the union above succeeds but the code below fails with "Error
>> communicating with MapOutputTracker" (see http://pastebin.com/fGDCXPkL
>> for more detailed error messages). Here is a snippet of the code that fails:
>>
>> val top_tracks = all_days.map(t => (t._1._2.toString, 1)).reduceByKey
>> (_+_)
>>   .filter(trackFilter)
>>   .repartition(4)
>>   .persist(StorageLevel.MEMORY_AND_DISK_SER)
>>
>> val observation_data = all_days
>>   .mapPartitions(_.map(o => (o._1._2.toString, o._2)))
>>   .join(top_tracks)
>>
>> The calculation of top_tracks works, but the last mapPartitions task
>> fails with given error message if given more than 10 days of data. Also
>> tried increasing the spark.akka.askTimeout setting, but it still fails
>> even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
>> and the kryo serialization.
>>
>> Realize that this is a rather long message, but I'm stuck and would
>> appreciate any help or clues for resolving this issue. Seems to be a
>> out-of-memory issue, but it does not seems to help to increase the number
>> of partitions.
>>
>> Thanks,
>> Anders
>>
>
>


Re: Trouble with large Yarn job

2015-01-11 Thread Sandy Ryza
Hi Anders,

Have you checked your NodeManager logs to make sure YARN isn't killing
executors for exceeding memory limits?

-Sandy

On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg  wrote:

> Hey,
>
> I have a job that keeps failing if too much data is processed, and I can't
> see how to get it working. I've tried repartitioning with more partitions
> and increasing amount of memory for the executors (now about 12G and 400
> executors. Here is a snippets of the first part of the code, which succeeds
> without any problems:
>
> val all_days = sc.union(
>   ds.dateInterval(startDate, date).map(date =>
> sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
>   .map(s => (
> (s.getUsername, s.getTrackUri),
> UserItemData(s.getUsername, s.getTrackUri,
>   build_vector1(date, s),
>   build_vector2(s
>   )
> )
>   .reduceByKey(sum_vectors)
>
> I want to process 30 days of data or more, but am only able to process
> about 10 days. If having more days of data (lower startDate in code
> above), the union above succeeds but the code below fails with "Error
> communicating with MapOutputTracker" (see http://pastebin.com/fGDCXPkL
> for more detailed error messages). Here is a snippet of the code that fails:
>
> val top_tracks = all_days.map(t => (t._1._2.toString, 1)).reduceByKey
> (_+_)
>   .filter(trackFilter)
>   .repartition(4)
>   .persist(StorageLevel.MEMORY_AND_DISK_SER)
>
> val observation_data = all_days
>   .mapPartitions(_.map(o => (o._1._2.toString, o._2)))
>   .join(top_tracks)
>
> The calculation of top_tracks works, but the last mapPartitions task
> fails with given error message if given more than 10 days of data. Also
> tried increasing the spark.akka.askTimeout setting, but it still fails
> even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
> and the kryo serialization.
>
> Realize that this is a rather long message, but I'm stuck and would
> appreciate any help or clues for resolving this issue. Seems to be a
> out-of-memory issue, but it does not seems to help to increase the number
> of partitions.
>
> Thanks,
> Anders
>