Executor and BlockManager memory size
Hi all, I'm confused about Executor and BlockManager, why they have different memory. 14/10/10 08:50:02 INFO AppClient$ClientActor: Executor added: app-20141010085001-/2 on worker-20141010004933-brick6-35657 (brick6:35657) with 6 cores 14/10/10 08:50:02 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141010085001-/2 on hostPort brick6:35657 with 6 cores, 50.0 GB RAM 14/10/10 08:50:07 INFO BlockManagerMasterActor: Registering block manager brick6:53296 with 26.8 GB RAM and on the WebUI, Executor IDAddressRDD Blocks Memory UsedDisk Used Active TasksFailed Tasks Complete TasksTotal TasksTask TimeInputShuffle ReadShuffle Write 0brick3:3760700.0 B / 26.8 GB0.0 B60 06 0 ms0.0 B0.0 B0.0 B 1brick1:5949300.0 B / 26.8 GB0.0 B60 06 0 ms0.0 B0.0 B0.0 B 2brick6:5329600.0 B / 26.8 GB0.0 B60 06 0 ms0.0 B0.0 B0.0 B 3brick5:3854300.0 B / 26.8 GB0.0 B60 06 0 ms0.0 B0.0 B0.0 B 4brick2:4493700.0 B / 26.8 GB0.0 B60 06 0 ms0.0 B0.0 B0.0 B 5brick4:4679800.0 B / 26.8 GB0.0 B60 06 0 ms0.0 B0.0 B0.0 B driverbrick0:5769200.0 B / 274.6 MB0.0 B 00 000 ms0.0 B0.0 B0.0 B As I understand it, a worker consist of a daemon and an executor, and executor takes charge both execution and storage. So does it mean that 26.8 GB is saved for storage and the rest is for execution? Another question is that, throughout execution, it seems that the blockmanager is always almost free. 14/10/05 14:33:44 INFO BlockManagerInfo: Added broadcast_21_piece0 in memory on brick2:57501 (size: 1669.0 B, free: 21.2 GB) I don't know what I'm missing here. Best regards, Larry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PageRank execution imbalance, might hurt performance by 6x
Hi all! I'm running PageRank on GraphX, and I find on some tasks on one machine can spend 5~6 times more time than on others, others are perfectly balance (around 1 second to finish). And since time for a stage (iteration) is determined by the slowest task, the performance is undesirable. I don't know if there's any internals that might make execution unstable? Like scheduling, garbage collection … A stage for mapPartitions at GraphImpl.scala:409 in mapReduceTriplets 408 // Map and combine. 409 val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap { 410 case (pid, edgePartition) = 411 // Choose scan method looks like this: Tasks Index ID Attempt Status Locality Level Executor Launch Time Duration ▴ GC Time Accumulators Input Shuffle Read Write Time Shuffle Write Errors 21 787 0 SUCCESS PROCESS_LOCAL brick0 2014/09/28 03:04:42 7 s 333.3 MB (memory) 4.9 MB 1 ms652.3 KB 0 768 0 SUCCESS PROCESS_LOCAL brick2 2014/09/28 03:04:42 7 s 531.5 MB (memory) 8.0 MB 2 ms1321.5 KB 9 775 0 SUCCESS PROCESS_LOCAL brick0 2014/09/28 03:04:42 6 s 270.4 MB (memory) 4.1 MB 1 ms659.3 KB 15 781 0 SUCCESS PROCESS_LOCAL brick0 2014/09/28 03:04:42 6 s 272.7 MB (memory) 4.3 MB 1 ms658.9 KB 3 769 0 SUCCESS PROCESS_LOCAL brick0 2014/09/28 03:04:42 6 s 285.5 MB (memory) 4.4 MB 1 ms658.5 KB 6 774 0 SUCCESS PROCESS_LOCAL brick2 2014/09/28 03:04:42 6 s 346.8 MB (memory) 4.6 MB 1 ms657.0 KB 12 780 0 SUCCESS PROCESS_LOCAL brick2 2014/09/28 03:04:42 6 s 313.2 MB (memory) 4.4 MB 1 ms645.5 KB 18 786 0 SUCCESS PROCESS_LOCAL brick2 2014/09/28 03:04:42 6 s 281.7 MB (memory) 4.2 MB 1 ms660.1 KB 1 771 0 SUCCESS PROCESS_LOCAL brick3 2014/09/28 03:04:42 2 s 339.1 MB (memory) 5.1 MB 1 ms657.4 KB 7 777 0 SUCCESS PROCESS_LOCAL brick3 2014/09/28 03:04:42 2 s 322.8 MB (memory) 4.9 MB 1 ms654.5 KB 13 783 0 SUCCESS PROCESS_LOCAL brick3 2014/09/28 03:04:42 2 s 279.8 MB (memory) 4.6 MB 1 ms655.4 KB 19 789 0 SUCCESS PROCESS_LOCAL brick3 2014/09/28 03:04:42 2 s 268.4 MB (memory) 4.4 MB 1 ms658.5 KB 16 784 0 SUCCESS PROCESS_LOCAL brick4 2014/09/28 03:04:42 1 s 339.1 MB (memory) 5.1 MB 1 ms660.1 KB 11 776 0 SUCCESS PROCESS_LOCAL brick1 2014/09/28 03:04:42 1 s 341.0 MB (memory) 5.3 MB 1 ms655.4 KB 2 773 0 SUCCESS PROCESS_LOCAL brick5 2014/09/28 03:04:42 1 s 320.9 MB (memory) 4.9 MB 1 ms655.3 KB 22 790 0 SUCCESS PROCESS_LOCAL brick4 2014/09/28 03:04:42 1 s 301.7 MB (memory) 4.9 MB 1 ms659.5 KB 17 782 0 SUCCESS PROCESS_LOCAL brick1 2014/09/28 03:04:42 1 s 317.1 MB (memory) 5.2 MB 1 ms653.7 KB 23 788 0 SUCCESS PROCESS_LOCAL brick1 2014/09/28 03:04:42 1 s 268.3 MB (memory) 4.9 MB 1 ms664.3 KB 8 779 0 SUCCESS PROCESS_LOCAL brick5 2014/09/28 03:04:42 1 s 291.3 MB (memory) 4.6 MB 1 ms660.6 KB 20 791 0 SUCCESS PROCESS_LOCAL brick5 2014/09/28 03:04:42 1 s 272.7 MB (memory) 4.5 MB 1 ms661.7 KB 10 778 0 SUCCESS PROCESS_LOCAL brick4 2014/09/28 03:04:42 1 s 276.5 MB (memory) 4.4 MB 1 ms656.4 KB 4 772 0 SUCCESS PROCESS_LOCAL brick4 2014/09/28 03:04:42 1 s 260.8 MB (memory) 4.4 MB 1 ms661.7 KB 14 785 0 SUCCESS PROCESS_LOCAL brick5 2014/09/28 03:04:42 1 s 262.7 MB (memory) 4.3 MB 1 ms651.7 KB 5 770 0 SUCCESS PROCESS_LOCAL brick1 2014/09/28 03:04:42 1 s 276.5 MB (memory) 4.7 MB 1 ms655.1 KB Thanks! Larry
VertexRDD partition imbalance
Hi all VertexRDD is partitioned with HashPartitioner, and it exhibits some imbalance of tasks. For example, Connected Components with partition strategy Edge2D: Aggregated Metrics by Executor Executor ID Task Time Total Tasks Failed Tasks Succeeded Tasks Input Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 1 10 s10 0 10 234.6 MB0.0 B 43.2 MB 0.0 B 0.0 B 2 3 s 3 0 3 70.4 MB 0.0 B 13.0 MB 0.0 B 0.0 B 3 6 s 6 0 6 140.7 MB0.0 B 25.9 MB 0.0 B 0.0 B 4 9 s 8 0 8 187.9 MB0.0 B 34.6 MB 0.0 B 0.0 B 5 10 s9 0 9 211.4 MB0.0 B 38.9 MB 0.0 B 0.0 B For a stage on mapPartitions at VertexRDD.scala:347 343 344 /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */ 345 private[graphx] def shipVertexAttributes( 346 shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = { 347 partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst))) 348 } 349 This is executed for every iteration in Pregel, so the imbalance is bad for performance. However, when run PageRank with Edge2D, the tasks are even across executors. (all finish 6 tasks) Our configuration is 6 node, 36 partitions. My questions is: What decides the number of tasks for different executors? And how to make it balance? Thanks! Larry
Re: Specifying Spark Executor Java options using Spark Submit
Hi Arun! I think you can find info at https://spark.apache.org/docs/latest/configuration.html quote: Spark provides three locations to configure the system: * Spark properties https://spark.apache.org/docs/latest/configuration.html#spark-propertiescontrol most application parameters and can be set by using aSparkConf https://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkConfobject, or through Java system properties. * Environment variables https://spark.apache.org/docs/latest/configuration.html#environment-variablescan be used to set per-machine settings, such as the IP address, through the|conf/spark-env.sh|script on each node. * Logging https://spark.apache.org/docs/latest/configuration.html#configuring-loggingcan be configured through|log4j.properties|. for your question I guess you can use |spark.executor.extraJavaOptions| (none) A string of extra JVM options to pass to executors. For instance, GC settings or other logging. Note that it is illegal to set Spark properties or heap size settings with this option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file used with the spark-submit script. Heap size settings can be set with spark.executor.memory. you can find it at Runtime Environment Larry On 9/24/14 10:52 PM, Arun Ahuja wrote: What is the proper way to specify java options for the Spark executors using spark-submit? We had done this previously using export SPARK_JAVA_OPTS='.. previously, for example to attach a debugger to each executor or add -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps On spark-submit I see --driver-java-options but is there an equivalent for individual executors? Thanks, Arun
Re: spark.shuffle.consolidateFiles seems not working
Hi Jianshi, I've met similar situation before. And my solution was 'ulimit', you can use -a to see your current settings -n to set open files limit (and other limits also) And I set -n to 10240. I see spark.shuffle.consolidateFiles helps by reusing open files. (so I don't know to what extend does it help) Hope it helps. Larry On 7/30/14, 4:01 PM, Jianshi Huang wrote: I'm using Spark 1.0.1 on Yarn-Client mode. SortByKey always reports a FileNotFoundExceptions with messages says too many open files. I already set spark.shuffle.consolidateFiles to true: conf.set(spark.shuffle.consolidateFiles, true) But it seems not working. What are the other possible reasons? How to fix it? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/