A typo - I mean't section 2.1.2.5 "ulimit and nproc" of
https://hbase.apache.org/book.html

Ameet


On Fri, Apr 11, 2014 at 10:32 AM, Ameet Kini <ameetk...@gmail.com> wrote:

>
> Turns out that my ulimit settings were too low. I bumped  up and the job
> successfully completes. Here's what I have now:
>
> $ ulimit -u   // for max user processes
> 81920
> $ ulimit -n  // for open files
> 81920
>
> I was thrown off by the OutOfMemoryError into thinking it is Spark running
> out of memory in the shuffle stage. My previous settings were 1024 for
> both, and while that worked for shuffle on small jobs (10s of gigs), it'd
> choke on the large ones. It would be good to document these in the tuning /
> configuration section. Something like section 2.5 "ulimit and nproc" of
> https://hbase.apache.org/book.html
>
>
> 14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception
> java.lang.OutOfMemoryError: unable to create new native thread
>     at java.lang.Thread.start0(Native Method)
>     at java.lang.Thread.start(Thread.java:657)
>     at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408)
>     at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488)
> 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> maxBytesInFlight: 50331648, minRequest: 10066329
> 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> maxBytesInFlight: 50331648, minRequest: 10066329
> 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> Getting 0 non-zero-bytes blocks out of 7773 blocks
> 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> Started 0 remote gets in  1 ms
> 14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3)
> 14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756
> java.io.IOException: DFSOutputStream is closed
>     at
> org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265)
>     at
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715)
>     at
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694)
>     at
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778)
>     at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66)
>     at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99)
>     at
> org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240)
>     at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300)
>     at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189)
>     at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176)
>     at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
>     at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
>     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>     at org.apache.spark.scheduler.Task.run(Task.scala:53)
>     at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:416)
>     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>     at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:679)
>
> Thanks,
> Ameet
>
>
> On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini <ameetk...@gmail.com> wrote:
>
>> val hrdd = sc.hadoopRDD(..)
>> val res =
>> hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex(
>> some code to save those partitions )
>>
>> I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My
>> custom partitioner generates over 20,000 partitions, so there are 20,000
>> tasks reading the shuffle files. On problems with low partitions (~ 1000),
>> the job completes successfully.
>>
>> On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and
>> each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6
>> cores per executor and brought it down to 3, and I still get
>> OutOfMemoryErrors at 20,000 partitions. I have
>> spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since
>> I am not caching any RDDs.
>>
>> Do those config params look reasonable for my shuffle size ? I'm not sure
>> what to increase - shuffle.memoryFraction or the memory that the reduce
>> tasks get. The latter I am guessing is whatever is left after giving
>> storage.memoryFraction and shuffle.memoryFraction.
>>
>> Thanks,
>> Ameet
>>
>>
>>
>

Reply via email to