Minor clarification: I'm running spark 1.1.0 on JDK 1.8, Linux 64 bit. On Sun, Nov 2, 2014 at 1:06 AM, Bharath Ravi Kumar <reachb...@gmail.com> wrote:
> Hi, > > I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD > of count ~ 100 million. The data size is 20GB and groupBy results in an RDD > of 1061 keys with values being Iterable<Tuple4<String, Integer, Double, > String>>. The job runs on 3 hosts in a standalone setup with each host's > executor having 100G RAM and 24 cores dedicated to it. While the groupBy > stage completes successfully with ~24GB of shuffle write, the > saveAsTextFile fails after repeated retries with each attempt failing due > to an out of memory error *[1]*. I understand that a few partitions may > be overloaded as a result of the groupBy and I've tried the following > config combinations unsuccessfully: > > 1) Repartition the initial rdd (44 input partitions but 1061 keys) across > 1061 paritions and have max cores = 3 so that each key is a "logical" > partition (though many partitions will end up on very few hosts), and each > host likely runs saveAsTextFile on a single key at a time due to max cores > = 3 with 3 hosts in the cluster. The level of parallelism is unspecified. > > 2) Leave max cores unspecified, set the level of parallelism to 72, and > leave number of partitions unspecified (in which case the # input > partitions was used, which is 44) > Since I do not intend to cache RDD's, I have set > spark.storage.memoryFraction=0.2 in both cases. > > My understanding is that if each host is processing a single logical > partition to saveAsTextFile and is reading from other hosts to write out > the RDD, it is unlikely that it would run out of memory. My interpretation > of the spark tuning guide is that the degree of parallelism has little > impact in case (1) above since max cores = number of hosts. Can someone > explain why there are still OOM's with 100G being available? On a related > note, intuitively (though I haven't read the source), it appears that an > entire key-value pair needn't fit into memory of a single host for > saveAsTextFile since a single shuffle read from a remote can be written to > HDFS before the next remote read is carried out. This way, not all data > needs to be collected at the same time. > > Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the > tuning guide and even as per Datastax's spark introduction), there may need > to be more documentation around the internals of spark to help users take > better informed tuning decisions with parallelism, max cores, number > partitions and other tunables. Is there any ongoing effort on that front? > > Thanks, > Bharath > > > *[1]* OOM stack trace and logs > 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID > 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array > size exceeds VM limit > java.util.Arrays.copyOf(Arrays.java:3326) > > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) > > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) > > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) > java.lang.StringBuilder.append(StringBuilder.java:136) > > scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) > scala.Tuple2.toString(Tuple2.scala:22) > > org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) > > org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.Task.run(Task.scala:54) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID > 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com, > 43704, 0), shuffleId=0, mapId=13, reduceId=92) > 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at > ModelTrainer.java:141) as failed due to a fetch failure from Stage 37 > (groupBy at ModelTrainer.java:133) > 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at > ModelTrainer.java:141) failed in 55.259 s > 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at > ModelTrainer.java:133) and Stage 36 (saveAsTextFile at > ModelTrainer.java:141) due to fetch failure > > > > >