Hi,

I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of
count ~ 100 million. The data size is 20GB and groupBy results in an RDD of
1061 keys with values being Iterable<Tuple4<String, Integer, Double,
String>>. The job runs on 3 hosts in a standalone setup with each host's
executor having 100G RAM and 24 cores dedicated to it. While the groupBy
stage completes successfully with ~24GB of shuffle write, the
saveAsTextFile fails after repeated retries with each attempt failing due
to an out of memory error *[1]*. I understand that a few partitions may be
overloaded as a result of the groupBy and I've tried the following config
combinations unsuccessfully:

1) Repartition the initial rdd (44 input partitions but 1061 keys) across
1061 paritions and have max cores = 3 so that each key is a "logical"
partition (though many partitions will end up on very few hosts), and each
host likely runs saveAsTextFile on a single key at a time due to max cores
= 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

2) Leave max cores unspecified, set the level of parallelism to 72, and
leave number of partitions unspecified (in which case the # input
partitions was used, which is 44)
Since I do not intend to cache RDD's, I have set
spark.storage.memoryFraction=0.2 in both cases.

My understanding is that if each host is processing a single logical
partition to saveAsTextFile and is reading from other hosts to write out
the RDD, it is unlikely that it would run out of memory. My interpretation
of the spark tuning guide is that the degree of parallelism has little
impact in case (1) above since max cores = number of hosts. Can someone
explain why there are still OOM's with 100G being available? On a related
note, intuitively (though I haven't read the source), it appears that an
entire key-value pair needn't fit into memory of a single host for
saveAsTextFile since a single shuffle read from a remote can be written to
HDFS before the next remote read is carried out. This way, not all data
needs to be collected at the same time.

Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
tuning guide and even as per Datastax's spark introduction), there may need
to be more documentation around the internals of spark to help users take
better informed tuning decisions with parallelism, max cores, number
partitions and other tunables. Is there any ongoing effort on that front?

Thanks,
Bharath


*[1]* OOM stack trace and logs
14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size
exceeds VM limit
        java.util.Arrays.copyOf(Arrays.java:3326)

java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)

java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)

java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
        java.lang.StringBuilder.append(StringBuilder.java:136)

scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
        scala.Tuple2.toString(Tuple2.scala:22)

org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)

org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID
1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com,
43704, 0), shuffleId=0, mapId=13, reduceId=92)
14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
ModelTrainer.java:141) as failed due to a fetch failure from Stage 37
(groupBy at ModelTrainer.java:133)
14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
ModelTrainer.java:141) failed in 55.259 s
14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
ModelTrainer.java:133) and Stage 36 (saveAsTextFile at
ModelTrainer.java:141) due to fetch failure

Reply via email to