Executor and BlockManager memory size

2014-10-09 Thread Larry Xiao

Hi all,

I'm confused about Executor and BlockManager, why they have different 
memory.
14/10/10 08:50:02 INFO AppClient$ClientActor: Executor added: 
app-20141010085001-/2 on worker-20141010004933-brick6-35657 
(brick6:35657) with 6 cores
14/10/10 08:50:02 INFO SparkDeploySchedulerBackend: Granted executor 
ID app-20141010085001-/2 on hostPort brick6:35657 with 6 cores, 
50.0 GB RAM


14/10/10 08:50:07 INFO BlockManagerMasterActor: Registering block 
manager brick6:53296 with 26.8 GB RAM

and on the WebUI,
Executor IDAddressRDD Blocks  Memory UsedDisk Used   
 Active TasksFailed Tasks  Complete TasksTotal TasksTask 
TimeInputShuffle ReadShuffle Write
0brick3:3760700.0 B / 26.8 GB0.0 B60  06   
 0 ms0.0 B0.0 B0.0 B
1brick1:5949300.0 B / 26.8 GB0.0 B60  06   
 0 ms0.0 B0.0 B0.0 B
2brick6:5329600.0 B / 26.8 GB0.0 B60  06   
 0 ms0.0 B0.0 B0.0 B
3brick5:3854300.0 B / 26.8 GB0.0 B60  06   
 0 ms0.0 B0.0 B0.0 B
4brick2:4493700.0 B / 26.8 GB0.0 B60  06   
 0 ms0.0 B0.0 B0.0 B
5brick4:4679800.0 B / 26.8 GB0.0 B60  06   
 0 ms0.0 B0.0 B0.0 B
driverbrick0:5769200.0 B / 274.6 MB0.0 B  00   
 000 ms0.0 B0.0 B0.0 B
As I understand it, a worker consist of a daemon and an executor, and 
executor takes charge both execution and storage.
So does it mean that 26.8 GB is saved for storage and the rest is for 
execution?


Another question is that, throughout execution, it seems that the 
blockmanager is always almost free.
14/10/05 14:33:44 INFO BlockManagerInfo: Added broadcast_21_piece0 in 
memory on brick2:57501 (size: 1669.0 B, free: 21.2 GB)

I don't know what I'm missing here.

Best regards,
Larry

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PageRank execution imbalance, might hurt performance by 6x

2014-09-27 Thread Larry Xiao

Hi all!

I'm running PageRank on GraphX, and I find on some tasks on one machine 
can spend 5~6 times more time than on others, others are perfectly 
balance (around 1 second to finish).
And since time for a stage (iteration) is determined by the slowest 
task, the performance is undesirable.


I don't know if there's any internals that might make execution 
unstable? Like scheduling, garbage collection …


A stage for mapPartitions at GraphImpl.scala:409

in mapReduceTriplets
408 // Map and combine.
409 val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap {
410   case (pid, edgePartition) =
411 // Choose scan method

looks like this:


   Tasks

Index 	ID 	Attempt 	Status 	Locality Level 	Executor 	Launch Time 
Duration ▴ 	GC Time 	Accumulators 	Input 	Shuffle Read 	Write Time 
Shuffle Write 	Errors

21  787 0   SUCCESS PROCESS_LOCAL   brick0  2014/09/28 
03:04:42 7 s 

333.3 MB (memory)   4.9 MB  1 ms652.3 KB
0   768 0   SUCCESS PROCESS_LOCAL   brick2  2014/09/28 
03:04:42 7 s 

531.5 MB (memory)   8.0 MB  2 ms1321.5 KB   
9   775 0   SUCCESS PROCESS_LOCAL   brick0  2014/09/28 
03:04:42 6 s 

270.4 MB (memory)   4.1 MB  1 ms659.3 KB
15  781 0   SUCCESS PROCESS_LOCAL   brick0  2014/09/28 
03:04:42 6 s 

272.7 MB (memory)   4.3 MB  1 ms658.9 KB
3   769 0   SUCCESS PROCESS_LOCAL   brick0  2014/09/28 
03:04:42 6 s 

285.5 MB (memory)   4.4 MB  1 ms658.5 KB
6   774 0   SUCCESS PROCESS_LOCAL   brick2  2014/09/28 
03:04:42 6 s 

346.8 MB (memory)   4.6 MB  1 ms657.0 KB
12  780 0   SUCCESS PROCESS_LOCAL   brick2  2014/09/28 
03:04:42 6 s 

313.2 MB (memory)   4.4 MB  1 ms645.5 KB
18  786 0   SUCCESS PROCESS_LOCAL   brick2  2014/09/28 
03:04:42 6 s 

281.7 MB (memory)   4.2 MB  1 ms660.1 KB
1   771 0   SUCCESS PROCESS_LOCAL   brick3  2014/09/28 
03:04:42 2 s 

339.1 MB (memory)   5.1 MB  1 ms657.4 KB
7   777 0   SUCCESS PROCESS_LOCAL   brick3  2014/09/28 
03:04:42 2 s 

322.8 MB (memory)   4.9 MB  1 ms654.5 KB
13  783 0   SUCCESS PROCESS_LOCAL   brick3  2014/09/28 
03:04:42 2 s 

279.8 MB (memory)   4.6 MB  1 ms655.4 KB
19  789 0   SUCCESS PROCESS_LOCAL   brick3  2014/09/28 
03:04:42 2 s 

268.4 MB (memory)   4.4 MB  1 ms658.5 KB
16  784 0   SUCCESS PROCESS_LOCAL   brick4  2014/09/28 
03:04:42 1 s 

339.1 MB (memory)   5.1 MB  1 ms660.1 KB
11  776 0   SUCCESS PROCESS_LOCAL   brick1  2014/09/28 
03:04:42 1 s 

341.0 MB (memory)   5.3 MB  1 ms655.4 KB
2   773 0   SUCCESS PROCESS_LOCAL   brick5  2014/09/28 
03:04:42 1 s 

320.9 MB (memory)   4.9 MB  1 ms655.3 KB
22  790 0   SUCCESS PROCESS_LOCAL   brick4  2014/09/28 
03:04:42 1 s 

301.7 MB (memory)   4.9 MB  1 ms659.5 KB
17  782 0   SUCCESS PROCESS_LOCAL   brick1  2014/09/28 
03:04:42 1 s 

317.1 MB (memory)   5.2 MB  1 ms653.7 KB
23  788 0   SUCCESS PROCESS_LOCAL   brick1  2014/09/28 
03:04:42 1 s 

268.3 MB (memory)   4.9 MB  1 ms664.3 KB
8   779 0   SUCCESS PROCESS_LOCAL   brick5  2014/09/28 
03:04:42 1 s 

291.3 MB (memory)   4.6 MB  1 ms660.6 KB
20  791 0   SUCCESS PROCESS_LOCAL   brick5  2014/09/28 
03:04:42 1 s 

272.7 MB (memory)   4.5 MB  1 ms661.7 KB
10  778 0   SUCCESS PROCESS_LOCAL   brick4  2014/09/28 
03:04:42 1 s 

276.5 MB (memory)   4.4 MB  1 ms656.4 KB
4   772 0   SUCCESS PROCESS_LOCAL   brick4  2014/09/28 
03:04:42 1 s 

260.8 MB (memory)   4.4 MB  1 ms661.7 KB
14  785 0   SUCCESS PROCESS_LOCAL   brick5  2014/09/28 
03:04:42 1 s 

262.7 MB (memory)   4.3 MB  1 ms651.7 KB
5   770 0   SUCCESS PROCESS_LOCAL   brick1  2014/09/28 
03:04:42 1 s 

276.5 MB (memory)   4.7 MB  1 ms655.1 KB


Thanks!
Larry


VertexRDD partition imbalance

2014-09-25 Thread Larry Xiao

Hi all

VertexRDD is partitioned with HashPartitioner, and it exhibits some 
imbalance of tasks.

For example, Connected Components with partition strategy Edge2D:


   Aggregated Metrics by Executor

Executor ID 	Task Time 	Total Tasks 	Failed Tasks 	Succeeded Tasks 
Input 	Shuffle Read 	Shuffle Write 	Shuffle Spill (Memory) 	Shuffle 
Spill (Disk)

1   10 s10  0   10  234.6 MB0.0 B   43.2 MB 
0.0 B   0.0 B
2   3 s 3   0   3   70.4 MB 0.0 B   13.0 MB 
0.0 B   0.0 B
3   6 s 6   0   6   140.7 MB0.0 B   25.9 MB 
0.0 B   0.0 B
4   9 s 8   0   8   187.9 MB0.0 B   34.6 MB 
0.0 B   0.0 B
5   10 s9   0   9   211.4 MB0.0 B   38.9 MB 
0.0 B   0.0 B

For a stage on mapPartitions at VertexRDD.scala:347
343
344   /** Generates an RDD of vertex attributes suitable for shipping to 
the edge partitions. */

345   private[graphx] def shipVertexAttributes(
346   shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, 
VertexAttributeBlock[VD])] = {
347 
partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, 
shipDst)))

348   }
349

This is executed for every iteration in Pregel, so the imbalance is bad 
for performance.


However, when run PageRank with Edge2D, the tasks are even across 
executors. (all finish 6 tasks)

Our configuration is 6 node, 36 partitions.

My questions is:

   What decides the number of tasks for different executors? And how to
   make it balance?

Thanks!
Larry



Re: Specifying Spark Executor Java options using Spark Submit

2014-09-24 Thread Larry Xiao

Hi Arun!

I think you can find info at 
https://spark.apache.org/docs/latest/configuration.html

quote:


Spark provides three locations to configure the system:

  * Spark properties

https://spark.apache.org/docs/latest/configuration.html#spark-propertiescontrol
most application parameters and can be set by using aSparkConf

https://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkConfobject,
or through Java system properties.
  * Environment variables

https://spark.apache.org/docs/latest/configuration.html#environment-variablescan
be used to set per-machine settings, such as the IP address,
through the|conf/spark-env.sh|script on each node.
  * Logging

https://spark.apache.org/docs/latest/configuration.html#configuring-loggingcan
be configured through|log4j.properties|.


for your question I guess you can use

|spark.executor.extraJavaOptions| 	(none) 	A string of extra JVM options 
to pass to executors. For instance, GC settings or other logging. Note 
that it is illegal to set Spark properties or heap size settings with 
this option. Spark properties should be set using a SparkConf object or 
the spark-defaults.conf file used with the spark-submit script. Heap 
size settings can be set with spark.executor.memory.


you  can find it at Runtime Environment

Larry

On 9/24/14 10:52 PM, Arun Ahuja wrote:
What is the proper way to specify java options for the Spark executors 
using spark-submit?  We had done this previously using


export SPARK_JAVA_OPTS='..

previously, for example to attach a debugger to each executor or add 
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps


On spark-submit I see --driver-java-options but is there an equivalent 
for individual executors?


Thanks,
Arun




Re: spark.shuffle.consolidateFiles seems not working

2014-07-30 Thread Larry Xiao

Hi Jianshi,

I've met similar situation before.
And my solution was 'ulimit', you can use

-a to see your current settings
-n to set open files limit
(and other limits also)

And I set -n to 10240.

I see spark.shuffle.consolidateFiles helps by reusing open files.
(so I don't know to what extend does it help)

Hope it helps.

Larry

On 7/30/14, 4:01 PM, Jianshi Huang wrote:

I'm using Spark 1.0.1 on Yarn-Client mode.

SortByKey always reports a FileNotFoundExceptions with messages says 
too many open files.


I already set spark.shuffle.consolidateFiles to true:

  conf.set(spark.shuffle.consolidateFiles, true)

But it seems not working. What are the other possible reasons? How to 
fix it?


Jianshi

--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/