Thanks. it worked.
2014-05-30 1:53 GMT+08:00 Matei Zaharia <matei.zaha...@gmail.com>: > That hash map is just a list of where each task ran, it’s not the actual > data. How many map and reduce tasks do you have? Maybe you need to give the > driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, > 100) to use only 100 tasks). > > Matei > > On May 29, 2014, at 2:03 AM, haitao .yao <yao.e...@gmail.com> wrote: > > > Hi, > > > > I used 1g memory for the driver java process and got OOM error on > driver side before reduceByKey. After analyzed the heap dump, the biggest > object is org.apache.spark.MapStatus, which occupied over 900MB memory. > > > > Here's my question: > > > > > > 1. Is there any optimization switches that I can tune to avoid this? I > have used the compression on output with spark.io.compression.codec. > > > > 2. Why the workers send all the data back to driver to run reduceByKey? > With the current implementation, if I use reduceByKey on TBs of data, that > will be a disaster for driver. Maybe I'm wrong about the assumption of the > spark implementation. > > > > > > And here's my code snippet: > > > > > > ``` > > > > val cntNew = spark.accumulator(0) > > > > val cntOld = spark.accumulator(0) > > > > val cntErr = spark.accumulator(0) > > > > > > val sequenceFileUrl = args(0) > > > > val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl) > > > > val stat = seq.map(pair => convertData( > > > > pair._2, cntNew, cntOld, cntErr > > > > )).reduceByKey(_ + _) > > > > stat.saveAsSequenceFile(args(1) > > > > ``` > > > > > > Thanks. > > > > > > -- > > > > haitao.yao@China > > -- haitao.yao@Beijing