That hash map is just a list of where each task ran, it’s not the actual data. How many map and reduce tasks do you have? Maybe you need to give the driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, 100) to use only 100 tasks).
Matei On May 29, 2014, at 2:03 AM, haitao .yao <yao.e...@gmail.com> wrote: > Hi, > > I used 1g memory for the driver java process and got OOM error on driver > side before reduceByKey. After analyzed the heap dump, the biggest object is > org.apache.spark.MapStatus, which occupied over 900MB memory. > > Here's my question: > > > 1. Is there any optimization switches that I can tune to avoid this? I have > used the compression on output with spark.io.compression.codec. > > 2. Why the workers send all the data back to driver to run reduceByKey? With > the current implementation, if I use reduceByKey on TBs of data, that will be > a disaster for driver. Maybe I'm wrong about the assumption of the spark > implementation. > > > And here's my code snippet: > > > ``` > > val cntNew = spark.accumulator(0) > > val cntOld = spark.accumulator(0) > > val cntErr = spark.accumulator(0) > > > val sequenceFileUrl = args(0) > > val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl) > > val stat = seq.map(pair => convertData( > > pair._2, cntNew, cntOld, cntErr > > )).reduceByKey(_ + _) > > stat.saveAsSequenceFile(args(1) > > ``` > > > Thanks. > > > -- > > haitao.yao@China