[ 
https://issues.apache.org/jira/browse/SPARK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14060113#comment-14060113
 ] 

Mridul Muralidharan commented on SPARK-2398:
--------------------------------------------


As discussed in the PR, I am attempting to list the various factors which 
contribute to overhead.
Note, this is not exhaustive (yet) - please add more to this JIRA - so that 
when we are reasonably sure, we can model the expected overhead based on these 
factors.

These factors are typically off-heap - since anything within heap is budgetted 
for by Xmx - and enforced by VM : and so should ideally (not practically 
always, see gc overheads) not exceed the Xmx value

1) 256 KB per socket accepted via ConnectionManager for inter-worker comm 
(setReceiveBufferSize)
Typically, there will be (numExecutor - 1) number of sockets open.

2) 128 KB per socket for writing output to dfs. For reads, this does not seem 
to be configured - and should be 8k per socket iirc.
Typically 1 per executor at a given point in time ?

3) 256k for each akka socket for send/receive buffer.
One per worker ? (to talk to master) - so 512kb ? Any other use of akka ?

4) If I am not wrong, netty might allocate multiple "spark.akka.frameSize" 
sized direct buffer. There might be a few of these allocated and pooled/reused.
I did not go in detail into netty code though. If someone else with more 
knowhow can clarify, that would be great !
Default size of 10mb for spark.akka.frameSize

5) The default size of the assembled spark jar is about 12x mb (and changing) - 
though not all classes get loaded, the overhead would be some function of this.
The actual footprint would be higher than the on-disk size.
IIRC this is outside of the heap - [~sowen], any comments on this ? I have not 
looked into these in like 10 years now !

6) Per thread (Xss) overhead of 1mb (for 64bit vm).
Last I recall, we have about 220 odd threads - not sure if this was at the 
master or on the workers.
Ofcourse, this is dependent on the various threadpools we use (io, computation, 
etc), akka and netty config, etc.

7) Disk read overhead.
Thanks for [~pwendell]'s fix, atleast for small files, the overhead is not too 
high - since we do not mmap files but directly read them.
But for anything larger than 8kb (default), we use memory mapped buffers.
The actual overhead depends on the number of files opened for read via 
DiskStore - and the entire file contents get mmap'ed into virt mem.
Note that there is some non-virt-mem overhead also at native level for these 
buffers.

The actual number of files opened should be carefully tracked to understand the 
effect of this on spark overhead : since this aspect is changing a lot off late.
Impact is on shuffle,  disk persisted rdd, among others.
The actual value would be application dependent (how large the data is !)


8) The overhead introduced by VM not being able to reclaim memory completely 
(the cost of moving data vs amount of space reclaimed).
Ideally, this should be low - but would be dependent on the heap space, 
collector used, among other things.
I am not very knowledgable of the recent advances in gc collectors, so I 
hesitate to put a number to this.



I am sure this is not an exhaustive list, please do add to this.
In our case specifically, and [~tgraves] could add more, the number of 
containers can be high (300+ is easily possible), memory per container is 
modest (8gig usually).
To add details of observed overhead patterns (from the PR discussion) - 
a) I have had inhouse GBDT impl run without customizing overhead (so default of 
384 mb) with 12gb container and 22 nodes on reasonably large dataset.
b) I have had to customize overhead to 1.7gb for collaborative filtering with 
8gb container and 300 nodes (on a fairly large dataset).
c) I have had to minimally customize overhead to do inhouse QR factorization of 
a 50k x 50k distributed dense matrix on 45 nodes at 12 gb each (this was 
incorrectly specified in the PR discussion).

> Trouble running Spark 1.0 on Yarn 
> ----------------------------------
>
>                 Key: SPARK-2398
>                 URL: https://issues.apache.org/jira/browse/SPARK-2398
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.0
>            Reporter: Nishkam Ravi
>
> Trouble running workloads in Spark-on-YARN cluster mode for Spark 1.0. 
> For example: SparkPageRank when run in standalone mode goes through without 
> any errors (tested for up to 30GB input dataset on a 6-node cluster).  Also 
> runs fine for a 1GB dataset in yarn cluster mode. Starts to choke (in yarn 
> cluster mode) as the input data size is increased. Confirmed for 16GB input 
> dataset.
> The same workload runs fine with Spark 0.9 in both standalone and yarn 
> cluster mode (for up to 30 GB input dataset on a 6-node cluster).
> Commandline used:
> (/opt/cloudera/parcels/CDH/lib/spark/bin/spark-submit --master yarn 
> --deploy-mode cluster --properties-file pagerank.conf  --driver-memory 30g 
> --driver-cores 16 --num-executors 5 --class 
> org.apache.spark.examples.SparkPageRank 
> /opt/cloudera/parcels/CDH/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0-SNAPSHOT.jar
>  pagerank_in $NUM_ITER)
> pagerank.conf:
> spark.master            spark://c1704.halxg.cloudera.com:7077
> spark.home      /opt/cloudera/parcels/CDH/lib/spark
> spark.executor.memory   32g
> spark.default.parallelism       118
> spark.cores.max 96
> spark.storage.memoryFraction    0.6
> spark.shuffle.memoryFraction    0.3
> spark.shuffle.compress  true
> spark.shuffle.spill.compress    true
> spark.broadcast.compress        true
> spark.rdd.compress      false
> spark.io.compression.codec      org.apache.spark.io.LZFCompressionCodec
> spark.io.compression.snappy.block.size  32768
> spark.reducer.maxMbInFlight     48
> spark.local.dir  /var/lib/jenkins/workspace/tmp
> spark.driver.memory     30g
> spark.executor.cores    16
> spark.locality.wait     6000
> spark.executor.instances        5
> UI shows ExecutorLostFailure. Yarn logs contain numerous exceptions:
> 14/07/07 17:59:49 WARN network.SendingConnection: Error writing in connection 
> to ConnectionManagerId(a1016.halxg.cloudera.com,54105)
> java.nio.channels.AsynchronousCloseException
>         at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:496)
>         at 
> org.apache.spark.network.SendingConnection.write(Connection.scala:361)
>         at 
> org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:142)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> --------
> java.io.IOException: Filesystem closed
>         at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
>         at 
> org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619)
>         at java.io.FilterInputStream.close(FilterInputStream.java:181)
>         at org.apache.hadoop.util.LineReader.close(LineReader.java:150)
>         at 
> org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:244)
>         at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:226)
>         at 
> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>         at 
> org.apache.spark.rdd.HadoopRDD$$anon$1$$anonfun$1.apply$mcV$sp(HadoopRDD.scala:197)
>         at 
> org.apache.spark.TaskContext$$anonfun$executeOnCompleteCallbacks$1.apply(TaskContext.scala:63)
>         at 
> org.apache.spark.TaskContext$$anonfun$executeOnCompleteCallbacks$1.apply(TaskContext.scala:63)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at 
> org.apache.spark.TaskContext.executeOnCompleteCallbacks(TaskContext.scala:63)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:156)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:97)
>         at org.apache.spark.scheduler.Task.run(Task.scala:51)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> -------
> 14/07/07 17:59:52 WARN network.SendingConnection: Error finishing connection 
> to a1016.halxg.cloudera.com/10.20.184.116:54105
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>         at 
> org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:313)
>         at 
> org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:203)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to