Setting Executor memory

2015-09-14 Thread Thomas Gerber
Hello,

I was looking for guidelines on what value to set executor memory to
(via spark.executor.memory for example).

This seems to be important to avoid OOM during tasks, especially in no swap
environments (like AWS EMR clusters).

This setting is really about the executor JVM heap. Hence, in order to come
up with the maximal amount of heap memory for the executor, we need to list:
1. the memory taken by other processes (Worker in standalone mode, ...)
2. all off-heap allocations in the executor

Fortunately, for #1, we can just look at memory consumption without any
application running.

For #2, it is trickier. What I suspect we should account for:
a. thread stack size
b. akka buffers (via akka framesize & number of akka threads)
c. kryo buffers
d. shuffle buffers
(e. tachyon)

Could anyone shed some light on this? Maybe a formula? Or maybe swap should
actually be turned on, as a safeguard against OOMs?

Thanks


Cores per executors

2015-09-09 Thread Thomas Gerber
Hello,

I was wondering how Spark was enforcing to use *only* X number of cores per
executor.

Is it simply running max Y tasks in parallel on each executor where X = Y
* spark.task.cpus? (This is what I understood from browsing
TaskSchedulerImpl).

Which would mean the processing power used for"map"- (if any) and
"reduce"-side shuffle sorting is unbound (ExternalAppendOnlyMap and
ExternalSorter I guess)?

Thanks,
Thomas


Shuffle files lifecycle

2015-06-29 Thread Thomas Gerber
Hello,

It is my understanding that shuffle are written on disk and that they act
as checkpoints.

I wonder if this is true only within a job, or across jobs. Please note
that I use the words job and stage carefully here.

1. can a shuffle created during JobN be used to skip many stages from
JobN+1? Or is the lifecycle of the shuffle files bound to the job that
created them?

2. when are shuffle files actually deleted? Is it TTL based or is it
cleaned when the job is over?

3. we have a very long batch application, and as it goes on, the number of
total tasks for each job gets larger and larger. It is not really a
problem, because most of those tasks will be skipped since we cache RDDs.
We noticed however that there is a delay in the actual start of a job of 1
min for every 2M tasks in your job. Are there suggested workarounds to
avoid that delay? Maybe saving the RDD and re-loading it?

Thanks
Thomas


Re: Shuffle files lifecycle

2015-06-29 Thread Thomas Gerber
Ah, for #3, maybe this is what *rdd.checkpoint *does!
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD

Thomas


On Mon, Jun 29, 2015 at 7:12 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Hello,

 It is my understanding that shuffle are written on disk and that they act
 as checkpoints.

 I wonder if this is true only within a job, or across jobs. Please note
 that I use the words job and stage carefully here.

 1. can a shuffle created during JobN be used to skip many stages from
 JobN+1? Or is the lifecycle of the shuffle files bound to the job that
 created them?

 2. when are shuffle files actually deleted? Is it TTL based or is it
 cleaned when the job is over?

 3. we have a very long batch application, and as it goes on, the number of
 total tasks for each job gets larger and larger. It is not really a
 problem, because most of those tasks will be skipped since we cache RDDs.
 We noticed however that there is a delay in the actual start of a job of 1
 min for every 2M tasks in your job. Are there suggested workarounds to
 avoid that delay? Maybe saving the RDD and re-loading it?

 Thanks
 Thomas




Re: Shuffle files lifecycle

2015-06-29 Thread Thomas Gerber
Thanks Silvio.


On Mon, Jun 29, 2015 at 7:41 PM, Silvio Fiorito 
silvio.fior...@granturing.com wrote:

   Regarding 1 and 2, yes shuffle output is stored on the worker local
 disks and will be reused across jobs as long as they’re available. You can
 identify when they’re used by seeing skipped stages in the job UI. They are
 periodically cleaned up based on available space of the configured
 spark.local.dirs paths.

   From: Thomas Gerber
 Date: Monday, June 29, 2015 at 10:12 PM
 To: user
 Subject: Shuffle files lifecycle

   Hello,

  It is my understanding that shuffle are written on disk and that they
 act as checkpoints.

  I wonder if this is true only within a job, or across jobs. Please note
 that I use the words job and stage carefully here.

  1. can a shuffle created during JobN be used to skip many stages from
 JobN+1? Or is the lifecycle of the shuffle files bound to the job that
 created them?

  2. when are shuffle files actually deleted? Is it TTL based or is it
 cleaned when the job is over?

  3. we have a very long batch application, and as it goes on, the number
 of total tasks for each job gets larger and larger. It is not really a
 problem, because most of those tasks will be skipped since we cache RDDs.
 We noticed however that there is a delay in the actual start of a job of 1
 min for every 2M tasks in your job. Are there suggested workarounds to
 avoid that delay? Maybe saving the RDD and re-loading it?

  Thanks
 Thomas




Re: GraphX - ConnectedComponents (Pregel) - longer and longer interval between jobs

2015-06-29 Thread Thomas Gerber
It seems the root cause of the delay was the sheer size of the DAG for
those jobs, which are towards the end of a long series of jobs.

To reduce it, you can probably try to checkpoint (rdd.checkpoint) some
previous RDDs. That will:
1. save the RDD on disk
2. remove all references to the parents of this RDD

Which means the when a job uses that RDD, the DAG stops at that RDD and
does not looks at its parents as it doesn't have them anymore. It is very
similar to saving your RDD and re-loading it as a fresh RDD.

On Fri, Jun 26, 2015 at 9:14 AM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Note that this problem is probably NOT caused directly by GraphX, but
 GraphX reveals it because as you go further down the iterations, you get
 further and further away of a shuffle you can rely on.

 On Thu, Jun 25, 2015 at 7:43 PM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Hello,

 We run GraphX ConnectedComponents, and we notice that there is a time gap
 that becomes larger and larger during Jobs, that is not accounted for.

 In the screenshot attached, you will notice that each job only takes
 around 2 1/2min. At first, the next job/iteration starts immediately after
 the previous one. But as we go through iterations, there is a gap (time
 where job N+1 starts - time where job N finishes) that grows, reaching
 ultimately 6 minutes around the 30th iteration .

 I suspect it has to do with DAG computation on the driver, as evidenced
 by the very large (and getting larger at every iteration) of pending stages
 that are ultimately skipped.

 So,
 1. is there anything obvious we can do to make that gap between
 iterations shorter?
 2. would dividing the number of partitions in the input RDD per 2 divide
 the gap by 2 as well?

 I ask because 3 min gap on average for a job length of 2 1/2 min = we
 are wasting 50% of CPU time on the Executors.

 Thanks!
 Thomas





Re: GraphX - ConnectedComponents (Pregel) - longer and longer interval between jobs

2015-06-26 Thread Thomas Gerber
Note that this problem is probably NOT caused directly by GraphX, but
GraphX reveals it because as you go further down the iterations, you get
further and further away of a shuffle you can rely on.

On Thu, Jun 25, 2015 at 7:43 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Hello,

 We run GraphX ConnectedComponents, and we notice that there is a time gap
 that becomes larger and larger during Jobs, that is not accounted for.

 In the screenshot attached, you will notice that each job only takes
 around 2 1/2min. At first, the next job/iteration starts immediately after
 the previous one. But as we go through iterations, there is a gap (time
 where job N+1 starts - time where job N finishes) that grows, reaching
 ultimately 6 minutes around the 30th iteration .

 I suspect it has to do with DAG computation on the driver, as evidenced by
 the very large (and getting larger at every iteration) of pending stages
 that are ultimately skipped.

 So,
 1. is there anything obvious we can do to make that gap between
 iterations shorter?
 2. would dividing the number of partitions in the input RDD per 2 divide
 the gap by 2 as well?

 I ask because 3 min gap on average for a job length of 2 1/2 min = we are
 wasting 50% of CPU time on the Executors.

 Thanks!
 Thomas



Re: Error communicating with MapOutputTracker

2015-05-15 Thread Thomas Gerber
Hi Imran,

Thanks for the advice, tweaking with some akka parameters helped. See below.

Now, we noticed that we get java heap OOM exceptions on the output tracker
when we have too many tasks. I wonder:
1. where does the map output tracker live? The driver? The master (when
those are not the same)?
2. how can we increase the heap for it? Especially when using spark-submit?

Thanks,
Thomas

PS: akka parameter that one might want to increase:
# akka timeouts/heartbeats settings multiplied by 10 to avoid problems
spark.akka.timeout 1000
spark.akka.heartbeat.pauses 6
spark.akka.failure-detector.threshold 3000.0
spark.akka.heartbeat.interval 1

# Hidden akka conf to avoid MapOutputTracker timeouts
# See
https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
spark.akka.askTimeout 300
spark.akka.lookupTimeout 300

On Fri, Mar 20, 2015 at 9:18 AM, Imran Rashid iras...@cloudera.com wrote:

 Hi Thomas,

 sorry for such a late reply.  I don't have any super-useful advice, but
 this seems like something that is important to follow up on.  to answer
 your immediate question, No, there should not be any hard limit to the
 number of tasks that MapOutputTracker can handle.  Though of course as
 things get bigger, the overheads increase which is why you might hit
 timeouts.

 Two other minor suggestions:
 (1) increase spark.akka.askTimeout -- thats the timeout you are running
 into, it defaults to 30 seconds
 (2) as you've noted, you've needed to play w/ other timeouts b/c of long
 GC pauses -- its possible some GC tuning might help, though its a bit of a
 black art so its hard to say what you can try.  You cold always try
 Concurrent Mark Swee to avoid the long pauses, but of course that will
 probably hurt overall performance.

 can you share any more details of what you are trying to do?

 Since you're fetching shuffle blocks in a shuffle map task, I guess you've
 got two shuffles back-to-back, eg.
 someRDD.reduceByKey{...}.map{...}.filter{...}.combineByKey{...}.  Do you
 expect to be doing a lot of GC in between the two shuffles?? -eg., in the
 little example I have, if there were lots of objects being created in the
 map  filter steps that will make it out of the eden space.  One possible
 solution to this would be to force the first shuffle to complete, before
 running any of the subsequent transformations, eg. by forcing
 materialization to the cache first

 val intermediateRDD = someRDD.reduceByKey{...}.persist(DISK)
 intermediateRDD.count() // force the shuffle to complete, without trying
 to do our complicated downstream logic at the same time

 val finalResult = intermediateRDD.map{...}.filter{...}.combineByKey{...}

 Also, can you share your data size?  Do you expect the shuffle to be
 skewed, or do you think it will be well-balanced?  Not that I'll have any
 suggestions for you based on the answer, but it may help us reproduce it
 and try to fix whatever the root cause is.

 thanks,
 Imran



 On Wed, Mar 4, 2015 at 12:30 PM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 I meant spark.default.parallelism of course.

 On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Follow up:
 We re-retried, this time after *decreasing* spark.parallelism. It was
 set to 16000 before, (5 times the number of cores in our cluster). It is
 now down to 6400 (2 times the number of cores).

 And it got past the point where it failed before.

 Does the MapOutputTracker have a limit on the number of tasks it can
 track?


 On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Hello,

 We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge
 workers). We use spark-submit to start an application.

 We got the following error which leads to a failed stage:

 Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
 most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
 ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
 communicating with MapOutputTracker


 We tried the whole application again, and it failed on the same stage
 (but it got more tasks completed on that stage) with the same error.

 We then looked at executors stderr, and all show similar logs, on both
 runs (see below). As far as we can tell, executors and master have disk
 space left.

 *Any suggestion on where to look to understand why the communication
 with the MapOutputTracker fails?*

 Thanks
 Thomas
 
 In case it matters, our akka settings:
 spark.akka.frameSize 50
 spark.akka.threads 8
 // those below are 10* the default, to cope with large GCs
 spark.akka.timeout 1000
 spark.akka.heartbeat.pauses 6
 spark.akka.failure-detector.threshold 3000.0
 spark.akka.heartbeat.interval 1

 Appendix: executor logs, where it starts going awry

 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 
 298525
 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage

Re: java.lang.OutOfMemoryError: unable to create new native thread

2015-03-24 Thread Thomas Gerber
So,

1. I reduced my  -XX:ThreadStackSize to 5m (instead of 10m - default is
1m), which is still OK for my need.
2. I reduced the executor memory to 44GB for a 60GB machine (instead of
49GB).

This seems to have helped. Thanks to Matthew and Sean.

Thomas

On Tue, Mar 24, 2015 at 3:49 PM, Matt Silvey matt.sil...@videoamp.com
wrote:

 My memory is hazy on this but aren't there hidden limitations to
 Linux-based threads?  I ran into some issues a couple of years ago where,
 and here is the fuzzy part, the kernel wants to reserve virtual memory per
 thread equal to the stack size.  When the total amount of reserved memory
 (not necessarily resident memory) exceeds the memory of the system it
 throws an OOM.  I'm looking for material to back this up.  Sorry for the
 initial vague response.

 Matthew

 On Tue, Mar 24, 2015 at 12:53 PM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Additional notes:
 I did not find anything wrong with the number of threads (ps -u USER -L |
 wc -l): around 780 on the master and 400 on executors. I am running on 100
 r3.2xlarge.

 On Tue, Mar 24, 2015 at 12:38 PM, Thomas Gerber thomas.ger...@radius.com
  wrote:

 Hello,

 I am seeing various crashes in spark on large jobs which all share a
 similar exception:

 java.lang.OutOfMemoryError: unable to create new native thread
 at java.lang.Thread.start0(Native Method)
 at java.lang.Thread.start(Thread.java:714)

 I increased nproc (i.e. ulimit -u) 10 fold, but it doesn't help.

 Does anyone know how to avoid those kinds of errors?

 Noteworthy: I added -XX:ThreadStackSize=10m on both driver and executor
 extra java options, which might have amplified the problem.

 Thanks for you help,
 Thomas






java.lang.OutOfMemoryError: unable to create new native thread

2015-03-24 Thread Thomas Gerber
Hello,

I am seeing various crashes in spark on large jobs which all share a
similar exception:

java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)

I increased nproc (i.e. ulimit -u) 10 fold, but it doesn't help.

Does anyone know how to avoid those kinds of errors?

Noteworthy: I added -XX:ThreadStackSize=10m on both driver and executor
extra java options, which might have amplified the problem.

Thanks for you help,
Thomas


Re: java.lang.OutOfMemoryError: unable to create new native thread

2015-03-24 Thread Thomas Gerber
Additional notes:
I did not find anything wrong with the number of threads (ps -u USER -L |
wc -l): around 780 on the master and 400 on executors. I am running on 100
r3.2xlarge.

On Tue, Mar 24, 2015 at 12:38 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Hello,

 I am seeing various crashes in spark on large jobs which all share a
 similar exception:

 java.lang.OutOfMemoryError: unable to create new native thread
 at java.lang.Thread.start0(Native Method)
 at java.lang.Thread.start(Thread.java:714)

 I increased nproc (i.e. ulimit -u) 10 fold, but it doesn't help.

 Does anyone know how to avoid those kinds of errors?

 Noteworthy: I added -XX:ThreadStackSize=10m on both driver and executor
 extra java options, which might have amplified the problem.

 Thanks for you help,
 Thomas



Re: Driver disassociated

2015-03-05 Thread Thomas Gerber
Thanks.
I was already setting those (and I checked they were in use through the
environment tab in the UI).

They were set at 10 times their default value: 6 and 1 respectively.

I'll start poking at spark.shuffle.io.retryWait.
Thanks!

On Wed, Mar 4, 2015 at 7:02 PM, Ted Yu yuzhih...@gmail.com wrote:

 See this thread:
 https://groups.google.com/forum/#!topic/akka-user/X3xzpTCbEFs

 Here're the relevant config parameters in Spark:
 val akkaHeartBeatPauses = conf.getInt(spark.akka.heartbeat.pauses,
 6000)
 val akkaHeartBeatInterval =
 conf.getInt(spark.akka.heartbeat.interval, 1000)

 Cheers

 On Wed, Mar 4, 2015 at 4:09 PM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Also,

 I was experiencing another problem which might be related:
 Error communicating with MapOutputTracker (see email in the ML today).

 I just thought I would mention it in case it is relevant.

 On Wed, Mar 4, 2015 at 4:07 PM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 1.2.1

 Also, I was using the following parameters, which are 10 times the
 default ones:
 spark.akka.timeout 1000
 spark.akka.heartbeat.pauses 6
 spark.akka.failure-detector.threshold 3000.0
 spark.akka.heartbeat.interval 1

 which should have helped *avoid* the problem if I understand correctly.

 Thanks,
 Thomas

 On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu yuzhih...@gmail.com wrote:

 What release are you using ?

 SPARK-3923 went into 1.2.0 release.

 Cheers

 On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com
  wrote:

 Hello,

 sometimes, in the *middle* of a job, the job stops (status is then
 seen as FINISHED in the master).

 There isn't anything wrong in the shell/submit output.

 When looking at the executor logs, I see logs like this:

 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch;
 tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
 :40019/user/MapOutputTracker#893807065]
 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs
 for shuffle 38, fetching them
 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766]
 - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 disassociated! Shutting down.
 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with
 remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 has failed, address is now gated for [5000] ms. Reason is: 
 [Disassociated].

 How can I investigate further?
 Thanks








Error communicating with MapOutputTracker

2015-03-04 Thread Thomas Gerber
Hello,

We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers).
We use spark-submit to start an application.

We got the following error which leads to a failed stage:

Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4
times, most recent failure: Lost task 3095.3 in stage 140.0 (TID
308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException:
Error communicating with MapOutputTracker


We tried the whole application again, and it failed on the same stage (but
it got more tasks completed on that stage) with the same error.

We then looked at executors stderr, and all show similar logs, on both runs
(see below). As far as we can tell, executors and master have disk space
left.

*Any suggestion on where to look to understand why the communication with
the MapOutputTracker fails?*

Thanks
Thomas

In case it matters, our akka settings:
spark.akka.frameSize 50
spark.akka.threads 8
// those below are 10* the default, to cope with large GCs
spark.akka.timeout 1000
spark.akka.heartbeat.pauses 6
spark.akka.failure-detector.threshold 3000.0
spark.akka.heartbeat.interval 1

Appendix: executor logs, where it starts going awry

15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525
15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525)
15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with
curMem=5543008799, maxMem=18127202549
15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored
as bytes in memory (estimated size 1473.0 B, free 11.7 GB)
15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block
broadcast_339_piece0
15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable
339 took 224 ms
15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with
curMem=5543010272, maxMem=18127202549
15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as
values in memory (estimated size 2.5 KB, free 11.7 GB)
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch;
tracker actor =
Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO 

Re: Error communicating with MapOutputTracker

2015-03-04 Thread Thomas Gerber
Follow up:
We re-retried, this time after *decreasing* spark.parallelism. It was set
to 16000 before, (5 times the number of cores in our cluster). It is now
down to 6400 (2 times the number of cores).

And it got past the point where it failed before.

Does the MapOutputTracker have a limit on the number of tasks it can track?


On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Hello,

 We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers).
 We use spark-submit to start an application.

 We got the following error which leads to a failed stage:

 Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
 most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
 ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
 communicating with MapOutputTracker


 We tried the whole application again, and it failed on the same stage (but
 it got more tasks completed on that stage) with the same error.

 We then looked at executors stderr, and all show similar logs, on both
 runs (see below). As far as we can tell, executors and master have disk
 space left.

 *Any suggestion on where to look to understand why the communication with
 the MapOutputTracker fails?*

 Thanks
 Thomas
 
 In case it matters, our akka settings:
 spark.akka.frameSize 50
 spark.akka.threads 8
 // those below are 10* the default, to cope with large GCs
 spark.akka.timeout 1000
 spark.akka.heartbeat.pauses 6
 spark.akka.failure-detector.threshold 3000.0
 spark.akka.heartbeat.interval 1

 Appendix: executor logs, where it starts going awry

 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525
 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
 298525)
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
 curMem=5543008799, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
 bytes in memory (estimated size 1473.0 B, free 11.7 GB)
 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
 broadcast_339_piece0
 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 
 224 ms
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
 curMem=5543010272, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in 
 memory (estimated size 2.5 KB, free 11.7 GB)
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor 
 = 
 Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them

Spark logs in standalone clusters

2015-03-04 Thread Thomas Gerber
Hello,

I was wondering where all the logs files were located on a standalone
cluster:

   1. the executor logs are in the work directory on each slave machine
   (stdout/stderr)
  - I've notice that GC information is in stdout, and stage information
  in stderr
  - *Could we get more information on what is written in stdout vs
  stderr?*
   2. the master log
  - The path to the log file is shown went you launch the master,
  like 
/mnt/var/log/apps/spark-hadoop-org.apache.spark.deploy.master.Master-MACHINENAME.out;
  - *Could we get more information on where this path is configured?*
   3. driver logs
  - It seems they are only in the console by default (although you can
  override that in the log4j.properties file.
   4. communication manager logs?
   - *Are there any logs for the communication manager (aka the
  MapOutputTracker?)?*
   5. Any other log file?

Thanks,
Thomas


Re: Error communicating with MapOutputTracker

2015-03-04 Thread Thomas Gerber
I meant spark.default.parallelism of course.

On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Follow up:
 We re-retried, this time after *decreasing* spark.parallelism. It was set
 to 16000 before, (5 times the number of cores in our cluster). It is now
 down to 6400 (2 times the number of cores).

 And it got past the point where it failed before.

 Does the MapOutputTracker have a limit on the number of tasks it can track?


 On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Hello,

 We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge
 workers). We use spark-submit to start an application.

 We got the following error which leads to a failed stage:

 Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
 most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
 ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
 communicating with MapOutputTracker


 We tried the whole application again, and it failed on the same stage
 (but it got more tasks completed on that stage) with the same error.

 We then looked at executors stderr, and all show similar logs, on both
 runs (see below). As far as we can tell, executors and master have disk
 space left.

 *Any suggestion on where to look to understand why the communication with
 the MapOutputTracker fails?*

 Thanks
 Thomas
 
 In case it matters, our akka settings:
 spark.akka.frameSize 50
 spark.akka.threads 8
 // those below are 10* the default, to cope with large GCs
 spark.akka.timeout 1000
 spark.akka.heartbeat.pauses 6
 spark.akka.failure-detector.threshold 3000.0
 spark.akka.heartbeat.interval 1

 Appendix: executor logs, where it starts going awry

 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525
 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
 298525)
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
 curMem=5543008799, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
 bytes in memory (estimated size 1473.0 B, free 11.7 GB)
 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
 broadcast_339_piece0
 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 
 224 ms
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
 curMem=5543010272, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in 
 memory (estimated size 2.5 KB, free 11.7 GB)
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
 actor = 
 Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs

Driver disassociated

2015-03-04 Thread Thomas Gerber
Hello,

sometimes, in the *middle* of a job, the job stops (status is then seen as
FINISHED in the master).

There isn't anything wrong in the shell/submit output.

When looking at the executor logs, I see logs like this:

15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
:40019/user/MapOutputTracker#893807065]
15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for
shuffle 38, fetching them
15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] -
[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated!
Shutting down.
15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has
failed, address is now gated for [5000] ms. Reason is: [Disassociated].

How can I investigate further?
Thanks


Re: Driver disassociated

2015-03-04 Thread Thomas Gerber
Also,

I was experiencing another problem which might be related:
Error communicating with MapOutputTracker (see email in the ML today).

I just thought I would mention it in case it is relevant.

On Wed, Mar 4, 2015 at 4:07 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 1.2.1

 Also, I was using the following parameters, which are 10 times the default
 ones:
 spark.akka.timeout 1000
 spark.akka.heartbeat.pauses 6
 spark.akka.failure-detector.threshold 3000.0
 spark.akka.heartbeat.interval 1

 which should have helped *avoid* the problem if I understand correctly.

 Thanks,
 Thomas

 On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu yuzhih...@gmail.com wrote:

 What release are you using ?

 SPARK-3923 went into 1.2.0 release.

 Cheers

 On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Hello,

 sometimes, in the *middle* of a job, the job stops (status is then seen
 as FINISHED in the master).

 There isn't anything wrong in the shell/submit output.

 When looking at the executor logs, I see logs like this:

 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
 actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
 :40019/user/MapOutputTracker#893807065]
 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs
 for shuffle 38, fetching them
 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766]
 - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 disassociated! Shutting down.
 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with
 remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 How can I investigate further?
 Thanks






Re: Driver disassociated

2015-03-04 Thread Thomas Gerber
1.2.1

Also, I was using the following parameters, which are 10 times the default
ones:
spark.akka.timeout 1000
spark.akka.heartbeat.pauses 6
spark.akka.failure-detector.threshold 3000.0
spark.akka.heartbeat.interval 1

which should have helped *avoid* the problem if I understand correctly.

Thanks,
Thomas

On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu yuzhih...@gmail.com wrote:

 What release are you using ?

 SPARK-3923 went into 1.2.0 release.

 Cheers

 On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Hello,

 sometimes, in the *middle* of a job, the job stops (status is then seen
 as FINISHED in the master).

 There isn't anything wrong in the shell/submit output.

 When looking at the executor logs, I see logs like this:

 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
 actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
 :40019/user/MapOutputTracker#893807065]
 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for
 shuffle 38, fetching them
 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766]
 - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 disassociated! Shutting down.
 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with
 remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 How can I investigate further?
 Thanks





Re: Executors dropping all memory stored RDDs?

2015-02-24 Thread Thomas Gerber
I have a strong suspicion that it was caused by a disk full on the executor.
I am not sure if the executor was supposed to recover that way from it.

I cannot be sure about it, I should have had enough disk space, but I think
I had some data skew which could have lead to some executor to run out of
disk.

So, in case someone else notices a behavior like this, make sure you check
your cluster monitor (like ganglia).

On Wed, Jan 28, 2015 at 5:40 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Hello,

 I am storing RDDs with the MEMORY_ONLY_SER Storage Level, during the run
 of a big job.

 At some point during the job, I went to the Executors page, and saw that
 80% of my executors did not have stored RDDs anymore (executors.png). On
 the storage page, everything seems there (storage.png).

 But if I look at a given RDD (RDD_83.png), although it tells me on top
 that all 100 partitions are cached, when I look at the details, only 17 are
 actually stored (RDD_83_partitions), all on the 20% of executors that still
 had stored RDDs based on the Executors page.

 So I wonder:
 1. Are those RDD still cached (in which case, we have a small reporting
 error), or not?
 2. If not, what could cause an executor to drop its memory-stored RDD
 blocks?

 I guess a restart of an executor? When I compare an executor that seems to
 have dropped blocks vs one that has not:
 *** their
 *spark-hadoop-org.apache.spark.deploy.worker.Worker-1-ip-XX-XX-XX-XX.ec2.internal.out*
 content look the same
 *** they both have the same etime in ps (so, I guess no restart?)
 *** didn't see anything in the app log in the work folder (but it is
 large, so I might have missed it)

 Also, I must mention that the cluster was doing a lot of GCs, which might
 be a cause of the trouble.

 I would appreciate any pointer.
 Thomas




Shuffle Spill

2015-02-20 Thread Thomas Gerber
Hello,

I have a few tasks in a stage with lots of tasks that have a large amount
of shuffle spill.

I scouted the web to understand shuffle spill, and I did not find any
simple explanation of the spill mechanism. What I put together is:

1. the shuffle spill can happens when the shuffle is written on disk (i.e.
by the last map stage, as opposed to when the shuffle is read by the
reduce stage)
2. the reason it happens is when it has a lot to write in the shuffle, and
since that shuffle needs to be sorted by key, the spilling mechanism allows
Spark to do that

I am unclear however if a large task will systematically lead to shuffle
spill, or if the number of keys (for the next reduce stage) that particular
task encounters has also an impact.

Concretely:
Let's say I have:
val ab = RDD[(a,b)]
val ac = RDD[(a,c)]
val bd = RDD[(b,d)]

and I do:
val bc = ab.join(ac).values // we investigate this task, triggered by values
val cd = bc.join(bd).values

The task we investigate reads from a previous shuffle, and will write to
another shuffle to prepare for the second join. I know that I have data
skew on a key on a, meaning a few tasks are expected to be large and I
have stragglers.

Now, is that the cause of the shuffle spill, or is it because those
straggler tasks also happen to have in their midst a very large amount of
distinct bs?

Thanks