Thanks. it worked.

2014-05-30 1:53 GMT+08:00 Matei Zaharia <matei.zaha...@gmail.com>:

> That hash map is just a list of where each task ran, it’s not the actual
> data. How many map and reduce tasks do you have? Maybe you need to give the
> driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _,
> 100) to use only 100 tasks).
>
> Matei
>
> On May 29, 2014, at 2:03 AM, haitao .yao <yao.e...@gmail.com> wrote:
>
> > Hi,
> >
> >      I used 1g memory for the driver java process and got OOM error on
> driver side before reduceByKey. After analyzed the heap dump, the biggest
> object is org.apache.spark.MapStatus, which occupied over 900MB memory.
> >
> > Here's my question:
> >
> >
> > 1. Is there any optimization switches that I can tune to avoid this? I
> have used the compression on output with spark.io.compression.codec.
> >
> > 2. Why the workers send all the data back to driver to run reduceByKey?
> With the current implementation, if I use reduceByKey on TBs of data, that
> will be a disaster for driver. Maybe I'm wrong about the assumption of the
> spark implementation.
> >
> >
> > And here's my code snippet:
> >
> >
> > ```
> >
> >     val cntNew = spark.accumulator(0)
> >
> >     val cntOld = spark.accumulator(0)
> >
> >     val cntErr = spark.accumulator(0)
> >
> >
> >     val sequenceFileUrl = args(0)
> >
> >     val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)
> >
> >     val stat = seq.map(pair => convertData(
> >
> >       pair._2, cntNew, cntOld, cntErr
> >
> >     )).reduceByKey(_ + _)
> >
> >     stat.saveAsSequenceFile(args(1)
> >
> > ```
> >
> >
> > Thanks.
> >
> >
> > --
> >
> > haitao.yao@China
>
>


-- 
haitao.yao@Beijing

Reply via email to