I am setting spark.executor.memory as 1024m on a 3 node cluster with each node having 4 cores and 7 GB RAM. The combiner functions are taking scala case classes as input and are generating mutable.ListBuffer of scala case classes. Therefore, I am guessing hashCode and equals should be taken care of.
Thanks, Aniket On Wed, Apr 15, 2015 at 1:00 PM Xianjin YE <advance...@gmail.com> wrote: > what is your JVM heap size settings? The OOM in SIzeEstimator is caused > by a lot of entry in IdentifyHashMap. > A quick guess is that the object in your dataset is a custom class and you > didn't implement the hashCode and equals method correctly. > > > > On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote: > > > I am aggregating a dataset using combineByKey method and for a certain > input size, the job fails with the following error. I have enabled head > dumps to better analyze the issue and will report back if I have any > findings. Meanwhile, if you guys have any idea of what could possibly > result in this error or how to better debug this, please let me know. > > > > java.lang.OutOfMemoryError: Java heap space > > at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) > > at java.util.IdentityHashMap.put(IdentityHashMap.java:445) > > at > org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132) > > at > org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178) > > at > org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at > org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177) > > at > org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) > > at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) > > at > org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) > > at > org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) > > at > org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33) > > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) > > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105) > > at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93) > > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) > > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) > > > >