That hash map is just a list of where each task ran, it’s not the actual data. 
How many map and reduce tasks do you have? Maybe you need to give the driver a 
bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, 100) to use 
only 100 tasks).

Matei

On May 29, 2014, at 2:03 AM, haitao .yao <yao.e...@gmail.com> wrote:

> Hi,
> 
>      I used 1g memory for the driver java process and got OOM error on driver 
> side before reduceByKey. After analyzed the heap dump, the biggest object is 
> org.apache.spark.MapStatus, which occupied over 900MB memory. 
> 
> Here's my question: 
> 
> 
> 1. Is there any optimization switches that I can tune to avoid this? I have 
> used the compression on output with spark.io.compression.codec.  
> 
> 2. Why the workers send all the data back to driver to run reduceByKey? With 
> the current implementation, if I use reduceByKey on TBs of data, that will be 
> a disaster for driver. Maybe I'm wrong about the assumption of the spark 
> implementation.
> 
> 
> And here's my code snippet:
> 
> 
> ```
> 
>     val cntNew = spark.accumulator(0)
> 
>     val cntOld = spark.accumulator(0)
> 
>     val cntErr = spark.accumulator(0)
> 
> 
>     val sequenceFileUrl = args(0)
> 
>     val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)
> 
>     val stat = seq.map(pair => convertData(
> 
>       pair._2, cntNew, cntOld, cntErr
> 
>     )).reduceByKey(_ + _)
> 
>     stat.saveAsSequenceFile(args(1)
> 
> ```
> 
> 
> Thanks.
> 
> 
> -- 
> 
> haitao.yao@China

Reply via email to