Turns out that my ulimit settings were too low. I bumped up and the job successfully completes. Here's what I have now:
$ ulimit -u // for max user processes 81920 $ ulimit -n // for open files 81920 I was thrown off by the OutOfMemoryError into thinking it is Spark running out of memory in the shuffle stage. My previous settings were 1024 for both, and while that worked for shuffle on small jobs (10s of gigs), it'd choke on the large ones. It would be good to document these in the tuning / configuration section. Something like section 2.5 "ulimit and nproc" of https://hbase.apache.org/book.html 14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:657) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488) 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-zero-bytes blocks out of 7773 blocks 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in 1 ms 14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3) 14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756 java.io.IOException: DFSOutputStream is closed at org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265) at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99) at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240) at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300) at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189) at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176) at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679) Thanks, Ameet On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini <ameetk...@gmail.com> wrote: > val hrdd = sc.hadoopRDD(..) > val res = > hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex( > some code to save those partitions ) > > I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My > custom partitioner generates over 20,000 partitions, so there are 20,000 > tasks reading the shuffle files. On problems with low partitions (~ 1000), > the job completes successfully. > > On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and > each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6 > cores per executor and brought it down to 3, and I still get > OutOfMemoryErrors at 20,000 partitions. I have > spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since > I am not caching any RDDs. > > Do those config params look reasonable for my shuffle size ? I'm not sure > what to increase - shuffle.memoryFraction or the memory that the reduce > tasks get. The latter I am guessing is whatever is left after giving > storage.memoryFraction and shuffle.memoryFraction. > > Thanks, > Ameet > > >