Turns out that my ulimit settings were too low. I bumped  up and the job
successfully completes. Here's what I have now:

$ ulimit -u   // for max user processes
81920
$ ulimit -n  // for open files
81920

I was thrown off by the OutOfMemoryError into thinking it is Spark running
out of memory in the shuffle stage. My previous settings were 1024 for
both, and while that worked for shuffle on small jobs (10s of gigs), it'd
choke on the large ones. It would be good to document these in the tuning /
configuration section. Something like section 2.5 "ulimit and nproc" of
https://hbase.apache.org/book.html


14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception
java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:657)
    at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408)
    at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488)
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, minRequest: 10066329
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, minRequest: 10066329
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 0 non-zero-bytes blocks out of 7773 blocks
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 0 remote gets in  1 ms
14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3)
14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756
java.io.IOException: DFSOutputStream is closed
    at
org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265)
    at
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715)
    at
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694)
    at
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778)
    at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66)
    at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99)
    at
org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240)
    at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300)
    at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189)
    at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176)
    at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
    at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
    at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:679)

Thanks,
Ameet


On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini <ameetk...@gmail.com> wrote:

> val hrdd = sc.hadoopRDD(..)
> val res =
> hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex(
> some code to save those partitions )
>
> I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My
> custom partitioner generates over 20,000 partitions, so there are 20,000
> tasks reading the shuffle files. On problems with low partitions (~ 1000),
> the job completes successfully.
>
> On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and
> each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6
> cores per executor and brought it down to 3, and I still get
> OutOfMemoryErrors at 20,000 partitions. I have
> spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since
> I am not caching any RDDs.
>
> Do those config params look reasonable for my shuffle size ? I'm not sure
> what to increase - shuffle.memoryFraction or the memory that the reduce
> tasks get. The latter I am guessing is whatever is left after giving
> storage.memoryFraction and shuffle.memoryFraction.
>
> Thanks,
> Ameet
>
>
>

Reply via email to