Setting Executor memory
Hello, I was looking for guidelines on what value to set executor memory to (via spark.executor.memory for example). This seems to be important to avoid OOM during tasks, especially in no swap environments (like AWS EMR clusters). This setting is really about the executor JVM heap. Hence, in order to come up with the maximal amount of heap memory for the executor, we need to list: 1. the memory taken by other processes (Worker in standalone mode, ...) 2. all off-heap allocations in the executor Fortunately, for #1, we can just look at memory consumption without any application running. For #2, it is trickier. What I suspect we should account for: a. thread stack size b. akka buffers (via akka framesize & number of akka threads) c. kryo buffers d. shuffle buffers (e. tachyon) Could anyone shed some light on this? Maybe a formula? Or maybe swap should actually be turned on, as a safeguard against OOMs? Thanks
Cores per executors
Hello, I was wondering how Spark was enforcing to use *only* X number of cores per executor. Is it simply running max Y tasks in parallel on each executor where X = Y * spark.task.cpus? (This is what I understood from browsing TaskSchedulerImpl). Which would mean the processing power used for"map"- (if any) and "reduce"-side shuffle sorting is unbound (ExternalAppendOnlyMap and ExternalSorter I guess)? Thanks, Thomas
Shuffle files lifecycle
Hello, It is my understanding that shuffle are written on disk and that they act as checkpoints. I wonder if this is true only within a job, or across jobs. Please note that I use the words job and stage carefully here. 1. can a shuffle created during JobN be used to skip many stages from JobN+1? Or is the lifecycle of the shuffle files bound to the job that created them? 2. when are shuffle files actually deleted? Is it TTL based or is it cleaned when the job is over? 3. we have a very long batch application, and as it goes on, the number of total tasks for each job gets larger and larger. It is not really a problem, because most of those tasks will be skipped since we cache RDDs. We noticed however that there is a delay in the actual start of a job of 1 min for every 2M tasks in your job. Are there suggested workarounds to avoid that delay? Maybe saving the RDD and re-loading it? Thanks Thomas
Re: Shuffle files lifecycle
Ah, for #3, maybe this is what *rdd.checkpoint *does! https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD Thomas On Mon, Jun 29, 2015 at 7:12 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, It is my understanding that shuffle are written on disk and that they act as checkpoints. I wonder if this is true only within a job, or across jobs. Please note that I use the words job and stage carefully here. 1. can a shuffle created during JobN be used to skip many stages from JobN+1? Or is the lifecycle of the shuffle files bound to the job that created them? 2. when are shuffle files actually deleted? Is it TTL based or is it cleaned when the job is over? 3. we have a very long batch application, and as it goes on, the number of total tasks for each job gets larger and larger. It is not really a problem, because most of those tasks will be skipped since we cache RDDs. We noticed however that there is a delay in the actual start of a job of 1 min for every 2M tasks in your job. Are there suggested workarounds to avoid that delay? Maybe saving the RDD and re-loading it? Thanks Thomas
Re: Shuffle files lifecycle
Thanks Silvio. On Mon, Jun 29, 2015 at 7:41 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Regarding 1 and 2, yes shuffle output is stored on the worker local disks and will be reused across jobs as long as they’re available. You can identify when they’re used by seeing skipped stages in the job UI. They are periodically cleaned up based on available space of the configured spark.local.dirs paths. From: Thomas Gerber Date: Monday, June 29, 2015 at 10:12 PM To: user Subject: Shuffle files lifecycle Hello, It is my understanding that shuffle are written on disk and that they act as checkpoints. I wonder if this is true only within a job, or across jobs. Please note that I use the words job and stage carefully here. 1. can a shuffle created during JobN be used to skip many stages from JobN+1? Or is the lifecycle of the shuffle files bound to the job that created them? 2. when are shuffle files actually deleted? Is it TTL based or is it cleaned when the job is over? 3. we have a very long batch application, and as it goes on, the number of total tasks for each job gets larger and larger. It is not really a problem, because most of those tasks will be skipped since we cache RDDs. We noticed however that there is a delay in the actual start of a job of 1 min for every 2M tasks in your job. Are there suggested workarounds to avoid that delay? Maybe saving the RDD and re-loading it? Thanks Thomas
Re: GraphX - ConnectedComponents (Pregel) - longer and longer interval between jobs
It seems the root cause of the delay was the sheer size of the DAG for those jobs, which are towards the end of a long series of jobs. To reduce it, you can probably try to checkpoint (rdd.checkpoint) some previous RDDs. That will: 1. save the RDD on disk 2. remove all references to the parents of this RDD Which means the when a job uses that RDD, the DAG stops at that RDD and does not looks at its parents as it doesn't have them anymore. It is very similar to saving your RDD and re-loading it as a fresh RDD. On Fri, Jun 26, 2015 at 9:14 AM, Thomas Gerber thomas.ger...@radius.com wrote: Note that this problem is probably NOT caused directly by GraphX, but GraphX reveals it because as you go further down the iterations, you get further and further away of a shuffle you can rely on. On Thu, Jun 25, 2015 at 7:43 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, We run GraphX ConnectedComponents, and we notice that there is a time gap that becomes larger and larger during Jobs, that is not accounted for. In the screenshot attached, you will notice that each job only takes around 2 1/2min. At first, the next job/iteration starts immediately after the previous one. But as we go through iterations, there is a gap (time where job N+1 starts - time where job N finishes) that grows, reaching ultimately 6 minutes around the 30th iteration . I suspect it has to do with DAG computation on the driver, as evidenced by the very large (and getting larger at every iteration) of pending stages that are ultimately skipped. So, 1. is there anything obvious we can do to make that gap between iterations shorter? 2. would dividing the number of partitions in the input RDD per 2 divide the gap by 2 as well? I ask because 3 min gap on average for a job length of 2 1/2 min = we are wasting 50% of CPU time on the Executors. Thanks! Thomas
Re: GraphX - ConnectedComponents (Pregel) - longer and longer interval between jobs
Note that this problem is probably NOT caused directly by GraphX, but GraphX reveals it because as you go further down the iterations, you get further and further away of a shuffle you can rely on. On Thu, Jun 25, 2015 at 7:43 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, We run GraphX ConnectedComponents, and we notice that there is a time gap that becomes larger and larger during Jobs, that is not accounted for. In the screenshot attached, you will notice that each job only takes around 2 1/2min. At first, the next job/iteration starts immediately after the previous one. But as we go through iterations, there is a gap (time where job N+1 starts - time where job N finishes) that grows, reaching ultimately 6 minutes around the 30th iteration . I suspect it has to do with DAG computation on the driver, as evidenced by the very large (and getting larger at every iteration) of pending stages that are ultimately skipped. So, 1. is there anything obvious we can do to make that gap between iterations shorter? 2. would dividing the number of partitions in the input RDD per 2 divide the gap by 2 as well? I ask because 3 min gap on average for a job length of 2 1/2 min = we are wasting 50% of CPU time on the Executors. Thanks! Thomas
Re: Error communicating with MapOutputTracker
Hi Imran, Thanks for the advice, tweaking with some akka parameters helped. See below. Now, we noticed that we get java heap OOM exceptions on the output tracker when we have too many tasks. I wonder: 1. where does the map output tracker live? The driver? The master (when those are not the same)? 2. how can we increase the heap for it? Especially when using spark-submit? Thanks, Thomas PS: akka parameter that one might want to increase: # akka timeouts/heartbeats settings multiplied by 10 to avoid problems spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 # Hidden akka conf to avoid MapOutputTracker timeouts # See https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala spark.akka.askTimeout 300 spark.akka.lookupTimeout 300 On Fri, Mar 20, 2015 at 9:18 AM, Imran Rashid iras...@cloudera.com wrote: Hi Thomas, sorry for such a late reply. I don't have any super-useful advice, but this seems like something that is important to follow up on. to answer your immediate question, No, there should not be any hard limit to the number of tasks that MapOutputTracker can handle. Though of course as things get bigger, the overheads increase which is why you might hit timeouts. Two other minor suggestions: (1) increase spark.akka.askTimeout -- thats the timeout you are running into, it defaults to 30 seconds (2) as you've noted, you've needed to play w/ other timeouts b/c of long GC pauses -- its possible some GC tuning might help, though its a bit of a black art so its hard to say what you can try. You cold always try Concurrent Mark Swee to avoid the long pauses, but of course that will probably hurt overall performance. can you share any more details of what you are trying to do? Since you're fetching shuffle blocks in a shuffle map task, I guess you've got two shuffles back-to-back, eg. someRDD.reduceByKey{...}.map{...}.filter{...}.combineByKey{...}. Do you expect to be doing a lot of GC in between the two shuffles?? -eg., in the little example I have, if there were lots of objects being created in the map filter steps that will make it out of the eden space. One possible solution to this would be to force the first shuffle to complete, before running any of the subsequent transformations, eg. by forcing materialization to the cache first val intermediateRDD = someRDD.reduceByKey{...}.persist(DISK) intermediateRDD.count() // force the shuffle to complete, without trying to do our complicated downstream logic at the same time val finalResult = intermediateRDD.map{...}.filter{...}.combineByKey{...} Also, can you share your data size? Do you expect the shuffle to be skewed, or do you think it will be well-balanced? Not that I'll have any suggestions for you based on the answer, but it may help us reproduce it and try to fix whatever the root cause is. thanks, Imran On Wed, Mar 4, 2015 at 12:30 PM, Thomas Gerber thomas.ger...@radius.com wrote: I meant spark.default.parallelism of course. On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com wrote: Follow up: We re-retried, this time after *decreasing* spark.parallelism. It was set to 16000 before, (5 times the number of cores in our cluster). It is now down to 6400 (2 times the number of cores). And it got past the point where it failed before. Does the MapOutputTracker have a limit on the number of tasks it can track? On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers). We use spark-submit to start an application. We got the following error which leads to a failed stage: Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error communicating with MapOutputTracker We tried the whole application again, and it failed on the same stage (but it got more tasks completed on that stage) with the same error. We then looked at executors stderr, and all show similar logs, on both runs (see below). As far as we can tell, executors and master have disk space left. *Any suggestion on where to look to understand why the communication with the MapOutputTracker fails?* Thanks Thomas In case it matters, our akka settings: spark.akka.frameSize 50 spark.akka.threads 8 // those below are 10* the default, to cope with large GCs spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 Appendix: executor logs, where it starts going awry 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage
Re: java.lang.OutOfMemoryError: unable to create new native thread
So, 1. I reduced my -XX:ThreadStackSize to 5m (instead of 10m - default is 1m), which is still OK for my need. 2. I reduced the executor memory to 44GB for a 60GB machine (instead of 49GB). This seems to have helped. Thanks to Matthew and Sean. Thomas On Tue, Mar 24, 2015 at 3:49 PM, Matt Silvey matt.sil...@videoamp.com wrote: My memory is hazy on this but aren't there hidden limitations to Linux-based threads? I ran into some issues a couple of years ago where, and here is the fuzzy part, the kernel wants to reserve virtual memory per thread equal to the stack size. When the total amount of reserved memory (not necessarily resident memory) exceeds the memory of the system it throws an OOM. I'm looking for material to back this up. Sorry for the initial vague response. Matthew On Tue, Mar 24, 2015 at 12:53 PM, Thomas Gerber thomas.ger...@radius.com wrote: Additional notes: I did not find anything wrong with the number of threads (ps -u USER -L | wc -l): around 780 on the master and 400 on executors. I am running on 100 r3.2xlarge. On Tue, Mar 24, 2015 at 12:38 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, I am seeing various crashes in spark on large jobs which all share a similar exception: java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) I increased nproc (i.e. ulimit -u) 10 fold, but it doesn't help. Does anyone know how to avoid those kinds of errors? Noteworthy: I added -XX:ThreadStackSize=10m on both driver and executor extra java options, which might have amplified the problem. Thanks for you help, Thomas
java.lang.OutOfMemoryError: unable to create new native thread
Hello, I am seeing various crashes in spark on large jobs which all share a similar exception: java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) I increased nproc (i.e. ulimit -u) 10 fold, but it doesn't help. Does anyone know how to avoid those kinds of errors? Noteworthy: I added -XX:ThreadStackSize=10m on both driver and executor extra java options, which might have amplified the problem. Thanks for you help, Thomas
Re: java.lang.OutOfMemoryError: unable to create new native thread
Additional notes: I did not find anything wrong with the number of threads (ps -u USER -L | wc -l): around 780 on the master and 400 on executors. I am running on 100 r3.2xlarge. On Tue, Mar 24, 2015 at 12:38 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, I am seeing various crashes in spark on large jobs which all share a similar exception: java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) I increased nproc (i.e. ulimit -u) 10 fold, but it doesn't help. Does anyone know how to avoid those kinds of errors? Noteworthy: I added -XX:ThreadStackSize=10m on both driver and executor extra java options, which might have amplified the problem. Thanks for you help, Thomas
Re: Driver disassociated
Thanks. I was already setting those (and I checked they were in use through the environment tab in the UI). They were set at 10 times their default value: 6 and 1 respectively. I'll start poking at spark.shuffle.io.retryWait. Thanks! On Wed, Mar 4, 2015 at 7:02 PM, Ted Yu yuzhih...@gmail.com wrote: See this thread: https://groups.google.com/forum/#!topic/akka-user/X3xzpTCbEFs Here're the relevant config parameters in Spark: val akkaHeartBeatPauses = conf.getInt(spark.akka.heartbeat.pauses, 6000) val akkaHeartBeatInterval = conf.getInt(spark.akka.heartbeat.interval, 1000) Cheers On Wed, Mar 4, 2015 at 4:09 PM, Thomas Gerber thomas.ger...@radius.com wrote: Also, I was experiencing another problem which might be related: Error communicating with MapOutputTracker (see email in the ML today). I just thought I would mention it in case it is relevant. On Wed, Mar 4, 2015 at 4:07 PM, Thomas Gerber thomas.ger...@radius.com wrote: 1.2.1 Also, I was using the following parameters, which are 10 times the default ones: spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 which should have helped *avoid* the problem if I understand correctly. Thanks, Thomas On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu yuzhih...@gmail.com wrote: What release are you using ? SPARK-3923 went into 1.2.0 release. Cheers On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, sometimes, in the *middle* of a job, the job stops (status is then seen as FINISHED in the master). There isn't anything wrong in the shell/submit output. When looking at the executor logs, I see logs like this: 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal :40019/user/MapOutputTracker#893807065] 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 38, fetching them 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated! Shutting down. 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. How can I investigate further? Thanks
Error communicating with MapOutputTracker
Hello, We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers). We use spark-submit to start an application. We got the following error which leads to a failed stage: Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error communicating with MapOutputTracker We tried the whole application again, and it failed on the same stage (but it got more tasks completed on that stage) with the same error. We then looked at executors stderr, and all show similar logs, on both runs (see below). As far as we can tell, executors and master have disk space left. *Any suggestion on where to look to understand why the communication with the MapOutputTracker fails?* Thanks Thomas In case it matters, our akka settings: spark.akka.frameSize 50 spark.akka.threads 8 // those below are 10* the default, to cope with large GCs spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 Appendix: executor logs, where it starts going awry 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525) 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with curMem=5543008799, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as bytes in memory (estimated size 1473.0 B, free 11.7 GB) 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block broadcast_339_piece0 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 224 ms 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with curMem=5543010272, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in memory (estimated size 2.5 KB, free 11.7 GB) 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370] 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO
Re: Error communicating with MapOutputTracker
Follow up: We re-retried, this time after *decreasing* spark.parallelism. It was set to 16000 before, (5 times the number of cores in our cluster). It is now down to 6400 (2 times the number of cores). And it got past the point where it failed before. Does the MapOutputTracker have a limit on the number of tasks it can track? On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers). We use spark-submit to start an application. We got the following error which leads to a failed stage: Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error communicating with MapOutputTracker We tried the whole application again, and it failed on the same stage (but it got more tasks completed on that stage) with the same error. We then looked at executors stderr, and all show similar logs, on both runs (see below). As far as we can tell, executors and master have disk space left. *Any suggestion on where to look to understand why the communication with the MapOutputTracker fails?* Thanks Thomas In case it matters, our akka settings: spark.akka.frameSize 50 spark.akka.threads 8 // those below are 10* the default, to cope with large GCs spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 Appendix: executor logs, where it starts going awry 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525) 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with curMem=5543008799, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as bytes in memory (estimated size 1473.0 B, free 11.7 GB) 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block broadcast_339_piece0 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 224 ms 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with curMem=5543010272, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in memory (estimated size 2.5 KB, free 11.7 GB) 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370] 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them
Spark logs in standalone clusters
Hello, I was wondering where all the logs files were located on a standalone cluster: 1. the executor logs are in the work directory on each slave machine (stdout/stderr) - I've notice that GC information is in stdout, and stage information in stderr - *Could we get more information on what is written in stdout vs stderr?* 2. the master log - The path to the log file is shown went you launch the master, like /mnt/var/log/apps/spark-hadoop-org.apache.spark.deploy.master.Master-MACHINENAME.out; - *Could we get more information on where this path is configured?* 3. driver logs - It seems they are only in the console by default (although you can override that in the log4j.properties file. 4. communication manager logs? - *Are there any logs for the communication manager (aka the MapOutputTracker?)?* 5. Any other log file? Thanks, Thomas
Re: Error communicating with MapOutputTracker
I meant spark.default.parallelism of course. On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com wrote: Follow up: We re-retried, this time after *decreasing* spark.parallelism. It was set to 16000 before, (5 times the number of cores in our cluster). It is now down to 6400 (2 times the number of cores). And it got past the point where it failed before. Does the MapOutputTracker have a limit on the number of tasks it can track? On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers). We use spark-submit to start an application. We got the following error which leads to a failed stage: Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error communicating with MapOutputTracker We tried the whole application again, and it failed on the same stage (but it got more tasks completed on that stage) with the same error. We then looked at executors stderr, and all show similar logs, on both runs (see below). As far as we can tell, executors and master have disk space left. *Any suggestion on where to look to understand why the communication with the MapOutputTracker fails?* Thanks Thomas In case it matters, our akka settings: spark.akka.frameSize 50 spark.akka.threads 8 // those below are 10* the default, to cope with large GCs spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 Appendix: executor logs, where it starts going awry 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525) 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with curMem=5543008799, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as bytes in memory (estimated size 1473.0 B, free 11.7 GB) 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block broadcast_339_piece0 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 224 ms 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with curMem=5543010272, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in memory (estimated size 2.5 KB, free 11.7 GB) 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370] 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
Driver disassociated
Hello, sometimes, in the *middle* of a job, the job stops (status is then seen as FINISHED in the master). There isn't anything wrong in the shell/submit output. When looking at the executor logs, I see logs like this: 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal :40019/user/MapOutputTracker#893807065] 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 38, fetching them 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated! Shutting down. 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. How can I investigate further? Thanks
Re: Driver disassociated
Also, I was experiencing another problem which might be related: Error communicating with MapOutputTracker (see email in the ML today). I just thought I would mention it in case it is relevant. On Wed, Mar 4, 2015 at 4:07 PM, Thomas Gerber thomas.ger...@radius.com wrote: 1.2.1 Also, I was using the following parameters, which are 10 times the default ones: spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 which should have helped *avoid* the problem if I understand correctly. Thanks, Thomas On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu yuzhih...@gmail.com wrote: What release are you using ? SPARK-3923 went into 1.2.0 release. Cheers On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, sometimes, in the *middle* of a job, the job stops (status is then seen as FINISHED in the master). There isn't anything wrong in the shell/submit output. When looking at the executor logs, I see logs like this: 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal :40019/user/MapOutputTracker#893807065] 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 38, fetching them 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated! Shutting down. 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. How can I investigate further? Thanks
Re: Driver disassociated
1.2.1 Also, I was using the following parameters, which are 10 times the default ones: spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 which should have helped *avoid* the problem if I understand correctly. Thanks, Thomas On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu yuzhih...@gmail.com wrote: What release are you using ? SPARK-3923 went into 1.2.0 release. Cheers On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, sometimes, in the *middle* of a job, the job stops (status is then seen as FINISHED in the master). There isn't anything wrong in the shell/submit output. When looking at the executor logs, I see logs like this: 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal :40019/user/MapOutputTracker#893807065] 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 38, fetching them 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated! Shutting down. 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. How can I investigate further? Thanks
Re: Executors dropping all memory stored RDDs?
I have a strong suspicion that it was caused by a disk full on the executor. I am not sure if the executor was supposed to recover that way from it. I cannot be sure about it, I should have had enough disk space, but I think I had some data skew which could have lead to some executor to run out of disk. So, in case someone else notices a behavior like this, make sure you check your cluster monitor (like ganglia). On Wed, Jan 28, 2015 at 5:40 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, I am storing RDDs with the MEMORY_ONLY_SER Storage Level, during the run of a big job. At some point during the job, I went to the Executors page, and saw that 80% of my executors did not have stored RDDs anymore (executors.png). On the storage page, everything seems there (storage.png). But if I look at a given RDD (RDD_83.png), although it tells me on top that all 100 partitions are cached, when I look at the details, only 17 are actually stored (RDD_83_partitions), all on the 20% of executors that still had stored RDDs based on the Executors page. So I wonder: 1. Are those RDD still cached (in which case, we have a small reporting error), or not? 2. If not, what could cause an executor to drop its memory-stored RDD blocks? I guess a restart of an executor? When I compare an executor that seems to have dropped blocks vs one that has not: *** their *spark-hadoop-org.apache.spark.deploy.worker.Worker-1-ip-XX-XX-XX-XX.ec2.internal.out* content look the same *** they both have the same etime in ps (so, I guess no restart?) *** didn't see anything in the app log in the work folder (but it is large, so I might have missed it) Also, I must mention that the cluster was doing a lot of GCs, which might be a cause of the trouble. I would appreciate any pointer. Thomas
Shuffle Spill
Hello, I have a few tasks in a stage with lots of tasks that have a large amount of shuffle spill. I scouted the web to understand shuffle spill, and I did not find any simple explanation of the spill mechanism. What I put together is: 1. the shuffle spill can happens when the shuffle is written on disk (i.e. by the last map stage, as opposed to when the shuffle is read by the reduce stage) 2. the reason it happens is when it has a lot to write in the shuffle, and since that shuffle needs to be sorted by key, the spilling mechanism allows Spark to do that I am unclear however if a large task will systematically lead to shuffle spill, or if the number of keys (for the next reduce stage) that particular task encounters has also an impact. Concretely: Let's say I have: val ab = RDD[(a,b)] val ac = RDD[(a,c)] val bd = RDD[(b,d)] and I do: val bc = ab.join(ac).values // we investigate this task, triggered by values val cd = bc.join(bd).values The task we investigate reads from a previous shuffle, and will write to another shuffle to prepare for the second join. I know that I have data skew on a key on a, meaning a few tasks are expected to be large and I have stragglers. Now, is that the cause of the shuffle spill, or is it because those straggler tasks also happen to have in their midst a very large amount of distinct bs? Thanks