Minor clarification: I'm running spark 1.1.0 on JDK 1.8, Linux 64 bit.

On Sun, Nov 2, 2014 at 1:06 AM, Bharath Ravi Kumar <reachb...@gmail.com>
wrote:

> Hi,
>
> I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
> of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
> of 1061 keys with values being Iterable<Tuple4<String, Integer, Double,
> String>>. The job runs on 3 hosts in a standalone setup with each host's
> executor having 100G RAM and 24 cores dedicated to it. While the groupBy
> stage completes successfully with ~24GB of shuffle write, the
> saveAsTextFile fails after repeated retries with each attempt failing due
> to an out of memory error *[1]*. I understand that a few partitions may
> be overloaded as a result of the groupBy and I've tried the following
> config combinations unsuccessfully:
>
> 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
> 1061 paritions and have max cores = 3 so that each key is a "logical"
> partition (though many partitions will end up on very few hosts), and each
> host likely runs saveAsTextFile on a single key at a time due to max cores
> = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.
>
> 2) Leave max cores unspecified, set the level of parallelism to 72, and
> leave number of partitions unspecified (in which case the # input
> partitions was used, which is 44)
> Since I do not intend to cache RDD's, I have set
> spark.storage.memoryFraction=0.2 in both cases.
>
> My understanding is that if each host is processing a single logical
> partition to saveAsTextFile and is reading from other hosts to write out
> the RDD, it is unlikely that it would run out of memory. My interpretation
> of the spark tuning guide is that the degree of parallelism has little
> impact in case (1) above since max cores = number of hosts. Can someone
> explain why there are still OOM's with 100G being available? On a related
> note, intuitively (though I haven't read the source), it appears that an
> entire key-value pair needn't fit into memory of a single host for
> saveAsTextFile since a single shuffle read from a remote can be written to
> HDFS before the next remote read is carried out. This way, not all data
> needs to be collected at the same time.
>
> Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
> tuning guide and even as per Datastax's spark introduction), there may need
> to be more documentation around the internals of spark to help users take
> better informed tuning decisions with parallelism, max cores, number
> partitions and other tunables. Is there any ongoing effort on that front?
>
> Thanks,
> Bharath
>
>
> *[1]* OOM stack trace and logs
> 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
> 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
> size exceeds VM limit
>         java.util.Arrays.copyOf(Arrays.java:3326)
>
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>         java.lang.StringBuilder.append(StringBuilder.java:136)
>
> scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
>         scala.Tuple2.toString(Tuple2.scala:22)
>
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
>
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)
> 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID
> 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com,
> 43704, 0), shuffleId=0, mapId=13, reduceId=92)
> 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
> ModelTrainer.java:141) as failed due to a fetch failure from Stage 37
> (groupBy at ModelTrainer.java:133)
> 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
> ModelTrainer.java:141) failed in 55.259 s
> 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
> ModelTrainer.java:133) and Stage 36 (saveAsTextFile at
> ModelTrainer.java:141) due to fetch failure
>
>
>
>
>

Reply via email to