Hi,

FYI as follows.  Could you post your heap size settings as well your Spark app 
code?

Regards
Arthur

3.1.3 Detail Message: Requested array size exceeds VM limit

The detail message Requested array size exceeds VM limit indicates that the 
application (or APIs used by that application) attempted to allocate an array 
that is larger than the heap size. For example, if an application attempts to 
allocate an array of 512MB but the maximum heap size is 256MB then 
OutOfMemoryError will be thrown with the reason Requested array size exceeds VM 
limit. In most cases the problem is either a configuration issue (heap size too 
small), or a bug that results in an application attempting to create a huge 
array, for example, when the number of elements in the array are computed using 
an algorithm that computes an incorrect size.”




On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar <reachb...@gmail.com> wrote:

> Resurfacing the thread. Oom shouldn't be the norm for a common groupby / sort 
> use case in a framework that is leading in sorting bench marks? Or is there 
> something fundamentally wrong in the usage?
> 
> On 02-Nov-2014 1:06 am, "Bharath Ravi Kumar" <reachb...@gmail.com> wrote:
> Hi,
> 
> I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of 
> count ~ 100 million. The data size is 20GB and groupBy results in an RDD of 
> 1061 keys with values being Iterable<Tuple4<String, Integer, Double, 
> String>>. The job runs on 3 hosts in a standalone setup with each host's 
> executor having 100G RAM and 24 cores dedicated to it. While the groupBy 
> stage completes successfully with ~24GB of shuffle write, the saveAsTextFile 
> fails after repeated retries with each attempt failing due to an out of 
> memory error [1]. I understand that a few partitions may be overloaded as a 
> result of the groupBy and I've tried the following config combinations 
> unsuccessfully:
> 
> 1) Repartition the initial rdd (44 input partitions but 1061 keys) across 
> 1061 paritions and have max cores = 3 so that each key is a "logical" 
> partition (though many partitions will end up on very few hosts), and each 
> host likely runs saveAsTextFile on a single key at a time due to max cores = 
> 3 with 3 hosts in the cluster. The level of parallelism is unspecified.
> 
> 2) Leave max cores unspecified, set the level of parallelism to 72, and leave 
> number of partitions unspecified (in which case the # input partitions was 
> used, which is 44)
> Since I do not intend to cache RDD's, I have set 
> spark.storage.memoryFraction=0.2 in both cases.
> 
> My understanding is that if each host is processing a single logical 
> partition to saveAsTextFile and is reading from other hosts to write out the 
> RDD, it is unlikely that it would run out of memory. My interpretation of the 
> spark tuning guide is that the degree of parallelism has little impact in 
> case (1) above since max cores = number of hosts. Can someone explain why 
> there are still OOM's with 100G being available? On a related note, 
> intuitively (though I haven't read the source), it appears that an entire 
> key-value pair needn't fit into memory of a single host for saveAsTextFile 
> since a single shuffle read from a remote can be written to HDFS before the 
> next remote read is carried out. This way, not all data needs to be collected 
> at the same time. 
> 
> Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the 
> tuning guide and even as per Datastax's spark introduction), there may need 
> to be more documentation around the internals of spark to help users take 
> better informed tuning decisions with parallelism, max cores, number 
> partitions and other tunables. Is there any ongoing effort on that front?
> 
> Thanks,
> Bharath
> 
> 
> [1] OOM stack trace and logs
> 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID 
> 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size 
> exceeds VM limit
>         java.util.Arrays.copyOf(Arrays.java:3326)
>         
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>         
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>         java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>         java.lang.StringBuilder.append(StringBuilder.java:136)
>         scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
>         scala.Tuple2.toString(Tuple2.scala:22)
>         
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
>         
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
>         
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)
> 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID 
> 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com, 
> 43704, 0), shuffleId=0, mapId=13, reduceId=92)
> 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at 
> ModelTrainer.java:141) as failed due to a fetch failure from Stage 37 
> (groupBy at ModelTrainer.java:133)
> 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at 
> ModelTrainer.java:141) failed in 55.259 s
> 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at 
> ModelTrainer.java:133) and Stage 36 (saveAsTextFile at ModelTrainer.java:141) 
> due to fetch failure
> 
> 
> 
> 

Reply via email to