A typo - I mean't section 2.1.2.5 "ulimit and nproc" of https://hbase.apache.org/book.html
Ameet On Fri, Apr 11, 2014 at 10:32 AM, Ameet Kini <ameetk...@gmail.com> wrote: > > Turns out that my ulimit settings were too low. I bumped up and the job > successfully completes. Here's what I have now: > > $ ulimit -u // for max user processes > 81920 > $ ulimit -n // for open files > 81920 > > I was thrown off by the OutOfMemoryError into thinking it is Spark running > out of memory in the shuffle stage. My previous settings were 1024 for > both, and while that worked for shuffle on small jobs (10s of gigs), it'd > choke on the large ones. It would be good to document these in the tuning / > configuration section. Something like section 2.5 "ulimit and nproc" of > https://hbase.apache.org/book.html > > > 14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception > java.lang.OutOfMemoryError: unable to create new native thread > at java.lang.Thread.start0(Native Method) > at java.lang.Thread.start(Thread.java:657) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488) > 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: > maxBytesInFlight: 50331648, minRequest: 10066329 > 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: > maxBytesInFlight: 50331648, minRequest: 10066329 > 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: > Getting 0 non-zero-bytes blocks out of 7773 blocks > 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: > Started 0 remote gets in 1 ms > 14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3) > 14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756 > java.io.IOException: DFSOutputStream is closed > at > org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265) > at > org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715) > at > org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694) > at > org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778) > at > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66) > at > org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99) > at > org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240) > at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300) > at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189) > at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176) > at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) > at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) > at org.apache.spark.scheduler.Task.run(Task.scala:53) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:416) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:679) > > Thanks, > Ameet > > > On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini <ameetk...@gmail.com> wrote: > >> val hrdd = sc.hadoopRDD(..) >> val res = >> hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex( >> some code to save those partitions ) >> >> I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My >> custom partitioner generates over 20,000 partitions, so there are 20,000 >> tasks reading the shuffle files. On problems with low partitions (~ 1000), >> the job completes successfully. >> >> On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and >> each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6 >> cores per executor and brought it down to 3, and I still get >> OutOfMemoryErrors at 20,000 partitions. I have >> spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since >> I am not caching any RDDs. >> >> Do those config params look reasonable for my shuffle size ? I'm not sure >> what to increase - shuffle.memoryFraction or the memory that the reduce >> tasks get. The latter I am guessing is whatever is left after giving >> storage.memoryFraction and shuffle.memoryFraction. >> >> Thanks, >> Ameet >> >> >> >