Re: How to avoid shuffle errors for a large join ?

2015-09-16 Thread Reynold Xin
Only SQL and DataFrame for now.

We are thinking about how to apply that to a more general distributed
collection based API, but it's not in 1.5.

On Sat, Sep 5, 2015 at 11:56 AM, Gurvinder Singh  wrote:

> On 09/05/2015 11:22 AM, Reynold Xin wrote:
> > Try increase the shuffle memory fraction (by default it is only 16%).
> > Again, if you run Spark 1.5, this will probably run a lot faster,
> > especially if you increase the shuffle memory fraction ...
> Hi Reynold,
>
> Does the 1.5 has better join/cogroup performance for RDD case too or
> only for SQL.
>
> - Gurvinder
> >
> > On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak  > > wrote:
> >
> > While it works with sort-merge-join, it takes about 12h to finish
> > (with 1 shuffle partitions). My hunch is that the reason for
> > that is this:
> >
> > INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB
> > to disk (62 times so far)
> >
> > (and lots more where this comes from).
> >
> > On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin  > > wrote:
> >
> > Can you try 1.5? This should work much, much better in 1.5 out
> > of the box.
> >
> > For 1.4, I think you'd want to turn on sort-merge-join, which is
> > off by default. However, the sort-merge join in 1.4 can still
> > trigger a lot of garbage, making it slower. SMJ performance is
> > probably 5x - 1000x better in 1.5 for your case.
> >
> >
> > On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak
> > > wrote:
> >
> > I'm getting errors like "Removing executor with no recent
> > heartbeats" & "Missing an output location for shuffle"
> > errors for a large SparkSql join (1bn rows/2.5TB joined with
> > 1bn rows/30GB) and I'm not sure how to configure the job to
> > avoid them.
> >
> > The initial stage completes fine with some 30k tasks on a
> > cluster with 70 machines/10TB memory, generating about 6.5TB
> > of shuffle writes, but then the shuffle stage first waits
> > 30min in the scheduling phase according to the UI, and then
> > dies with the mentioned errors.
> >
> > I can see in the GC logs that the executors reach their
> > memory limits (32g per executor, 2 workers per machine) and
> > can't allocate any more stuff in the heap. Fwiw, the top 10
> > in the memory use histogram are:
> >
> > num #instances #bytes  class name
> > --
> >1: 24913959511958700560
> >  scala.collection.immutable.HashMap$HashMap1
> >2: 251085327 8034730464 
> >  scala.Tuple2
> >3: 243694737 5848673688  java.lang.Float
> >4: 231198778 5548770672  java.lang.Integer
> >5:  72191585 4298521576
> >  [Lscala.collection.immutable.HashMap;
> >6:  72191582 2310130624
> >  scala.collection.immutable.HashMap$HashTrieMap
> >7:  74114058 1778737392  java.lang.Long
> >8:   6059103  779203840  [Ljava.lang.Object;
> >9:   5461096  174755072
> >  scala.collection.mutable.ArrayBuffer
> >   10: 34749   70122104  [B
> >
> > Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
> >
> > spark.core.connection.ack.wait.timeout 600
> > spark.executor.heartbeatInterval   60s
> > spark.executor.memory  32g
> > spark.mesos.coarse false
> > spark.network.timeout  600s
> > spark.shuffle.blockTransferService netty
> > spark.shuffle.consolidateFiles true
> > spark.shuffle.file.buffer  1m
> > spark.shuffle.io.maxRetries6
> > spark.shuffle.manager  sort
> >
> > The join is currently configured with
> > spark.sql.shuffle.partitions=1000 but that doesn't seem to
> > help. Would increasing the partitions help ? Is there a
> > formula to determine an approximate partitions number value
> > for a join ?
> > Any help with this job would be appreciated !
> >
> > cheers,
> > Tom
> >
> >
> >
> >
>
>


Re: How to avoid shuffle errors for a large join ?

2015-09-05 Thread Reynold Xin
Try increase the shuffle memory fraction (by default it is only 16%).
Again, if you run Spark 1.5, this will probably run a lot faster,
especially if you increase the shuffle memory fraction ...

On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak  wrote:

> While it works with sort-merge-join, it takes about 12h to finish (with
> 1 shuffle partitions). My hunch is that the reason for that is this:
>
> INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to
> disk (62 times so far)
>
> (and lots more where this comes from).
>
> On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin  wrote:
>
>> Can you try 1.5? This should work much, much better in 1.5 out of the box.
>>
>> For 1.4, I think you'd want to turn on sort-merge-join, which is off by
>> default. However, the sort-merge join in 1.4 can still trigger a lot of
>> garbage, making it slower. SMJ performance is probably 5x - 1000x better in
>> 1.5 for your case.
>>
>>
>> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak  wrote:
>>
>>> I'm getting errors like "Removing executor with no recent heartbeats" &
>>> "Missing an output location for shuffle" errors for a large SparkSql join
>>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
>>> configure the job to avoid them.
>>>
>>> The initial stage completes fine with some 30k tasks on a cluster with
>>> 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
>>> the shuffle stage first waits 30min in the scheduling phase according to
>>> the UI, and then dies with the mentioned errors.
>>>
>>> I can see in the GC logs that the executors reach their memory limits
>>> (32g per executor, 2 workers per machine) and can't allocate any more stuff
>>> in the heap. Fwiw, the top 10 in the memory use histogram are:
>>>
>>> num #instances #bytes  class name
>>> --
>>>1: 24913959511958700560
>>>  scala.collection.immutable.HashMap$HashMap1
>>>2: 251085327 8034730464  scala.Tuple2
>>>3: 243694737 5848673688  java.lang.Float
>>>4: 231198778 5548770672  java.lang.Integer
>>>5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
>>>6:  72191582 2310130624
>>>  scala.collection.immutable.HashMap$HashTrieMap
>>>7:  74114058 1778737392  java.lang.Long
>>>8:   6059103  779203840  [Ljava.lang.Object;
>>>9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
>>>   10: 34749   70122104  [B
>>>
>>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>>>
>>> spark.core.connection.ack.wait.timeout 600
>>> spark.executor.heartbeatInterval   60s
>>> spark.executor.memory  32g
>>> spark.mesos.coarse false
>>> spark.network.timeout  600s
>>> spark.shuffle.blockTransferService netty
>>> spark.shuffle.consolidateFiles true
>>> spark.shuffle.file.buffer  1m
>>> spark.shuffle.io.maxRetries6
>>> spark.shuffle.manager  sort
>>>
>>> The join is currently configured with spark.sql.shuffle.partitions=1000
>>> but that doesn't seem to help. Would increasing the partitions help ? Is
>>> there a formula to determine an approximate partitions number value for a
>>> join ?
>>> Any help with this job would be appreciated !
>>>
>>> cheers,
>>> Tom
>>>
>>
>>
>


Re: How to avoid shuffle errors for a large join ?

2015-09-05 Thread Gurvinder Singh
On 09/05/2015 11:22 AM, Reynold Xin wrote:
> Try increase the shuffle memory fraction (by default it is only 16%).
> Again, if you run Spark 1.5, this will probably run a lot faster,
> especially if you increase the shuffle memory fraction ...
Hi Reynold,

Does the 1.5 has better join/cogroup performance for RDD case too or
only for SQL.

- Gurvinder
> 
> On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak  > wrote:
> 
> While it works with sort-merge-join, it takes about 12h to finish
> (with 1 shuffle partitions). My hunch is that the reason for
> that is this:
> 
> INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB
> to disk (62 times so far)
> 
> (and lots more where this comes from).
> 
> On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin  > wrote:
> 
> Can you try 1.5? This should work much, much better in 1.5 out
> of the box.
> 
> For 1.4, I think you'd want to turn on sort-merge-join, which is
> off by default. However, the sort-merge join in 1.4 can still
> trigger a lot of garbage, making it slower. SMJ performance is
> probably 5x - 1000x better in 1.5 for your case.
> 
> 
> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak
> > wrote:
> 
> I'm getting errors like "Removing executor with no recent
> heartbeats" & "Missing an output location for shuffle"
> errors for a large SparkSql join (1bn rows/2.5TB joined with
> 1bn rows/30GB) and I'm not sure how to configure the job to
> avoid them.
> 
> The initial stage completes fine with some 30k tasks on a
> cluster with 70 machines/10TB memory, generating about 6.5TB
> of shuffle writes, but then the shuffle stage first waits
> 30min in the scheduling phase according to the UI, and then
> dies with the mentioned errors.
> 
> I can see in the GC logs that the executors reach their
> memory limits (32g per executor, 2 workers per machine) and
> can't allocate any more stuff in the heap. Fwiw, the top 10
> in the memory use histogram are:
> 
> num #instances #bytes  class name
> --
>1: 24913959511958700560
>  scala.collection.immutable.HashMap$HashMap1
>2: 251085327 8034730464 
>  scala.Tuple2
>3: 243694737 5848673688  java.lang.Float
>4: 231198778 5548770672  java.lang.Integer
>5:  72191585 4298521576
>  [Lscala.collection.immutable.HashMap;
>6:  72191582 2310130624
>  scala.collection.immutable.HashMap$HashTrieMap
>7:  74114058 1778737392  java.lang.Long
>8:   6059103  779203840  [Ljava.lang.Object;
>9:   5461096  174755072
>  scala.collection.mutable.ArrayBuffer
>   10: 34749   70122104  [B
> 
> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
> 
> spark.core.connection.ack.wait.timeout 600
> spark.executor.heartbeatInterval   60s
> spark.executor.memory  32g
> spark.mesos.coarse false
> spark.network.timeout  600s
> spark.shuffle.blockTransferService netty
> spark.shuffle.consolidateFiles true
> spark.shuffle.file.buffer  1m
> spark.shuffle.io.maxRetries6
> spark.shuffle.manager  sort
> 
> The join is currently configured with
> spark.sql.shuffle.partitions=1000 but that doesn't seem to
> help. Would increasing the partitions help ? Is there a
> formula to determine an approximate partitions number value
> for a join ?
> Any help with this job would be appreciated !
> 
> cheers,
> Tom
> 
> 
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to avoid shuffle errors for a large join ?

2015-09-01 Thread Thomas Dudziak
While it works with sort-merge-join, it takes about 12h to finish (with
1 shuffle partitions). My hunch is that the reason for that is this:

INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to disk
(62 times so far)

(and lots more where this comes from).

On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin  wrote:

> Can you try 1.5? This should work much, much better in 1.5 out of the box.
>
> For 1.4, I think you'd want to turn on sort-merge-join, which is off by
> default. However, the sort-merge join in 1.4 can still trigger a lot of
> garbage, making it slower. SMJ performance is probably 5x - 1000x better in
> 1.5 for your case.
>
>
> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak  wrote:
>
>> I'm getting errors like "Removing executor with no recent heartbeats" &
>> "Missing an output location for shuffle" errors for a large SparkSql join
>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
>> configure the job to avoid them.
>>
>> The initial stage completes fine with some 30k tasks on a cluster with 70
>> machines/10TB memory, generating about 6.5TB of shuffle writes, but then
>> the shuffle stage first waits 30min in the scheduling phase according to
>> the UI, and then dies with the mentioned errors.
>>
>> I can see in the GC logs that the executors reach their memory limits
>> (32g per executor, 2 workers per machine) and can't allocate any more stuff
>> in the heap. Fwiw, the top 10 in the memory use histogram are:
>>
>> num #instances #bytes  class name
>> --
>>1: 24913959511958700560
>>  scala.collection.immutable.HashMap$HashMap1
>>2: 251085327 8034730464  scala.Tuple2
>>3: 243694737 5848673688  java.lang.Float
>>4: 231198778 5548770672  java.lang.Integer
>>5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
>>6:  72191582 2310130624
>>  scala.collection.immutable.HashMap$HashTrieMap
>>7:  74114058 1778737392  java.lang.Long
>>8:   6059103  779203840  [Ljava.lang.Object;
>>9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
>>   10: 34749   70122104  [B
>>
>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>>
>> spark.core.connection.ack.wait.timeout 600
>> spark.executor.heartbeatInterval   60s
>> spark.executor.memory  32g
>> spark.mesos.coarse false
>> spark.network.timeout  600s
>> spark.shuffle.blockTransferService netty
>> spark.shuffle.consolidateFiles true
>> spark.shuffle.file.buffer  1m
>> spark.shuffle.io.maxRetries6
>> spark.shuffle.manager  sort
>>
>> The join is currently configured with spark.sql.shuffle.partitions=1000
>> but that doesn't seem to help. Would increasing the partitions help ? Is
>> there a formula to determine an approximate partitions number value for a
>> join ?
>> Any help with this job would be appreciated !
>>
>> cheers,
>> Tom
>>
>
>


Re: How to avoid shuffle errors for a large join ?

2015-08-29 Thread Reynold Xin
Can you try 1.5? This should work much, much better in 1.5 out of the box.

For 1.4, I think you'd want to turn on sort-merge-join, which is off by
default. However, the sort-merge join in 1.4 can still trigger a lot of
garbage, making it slower. SMJ performance is probably 5x - 1000x better in
1.5 for your case.


On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak tom...@gmail.com wrote:

 I'm getting errors like Removing executor with no recent heartbeats 
 Missing an output location for shuffle errors for a large SparkSql join
 (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
 configure the job to avoid them.

 The initial stage completes fine with some 30k tasks on a cluster with 70
 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
 the shuffle stage first waits 30min in the scheduling phase according to
 the UI, and then dies with the mentioned errors.

 I can see in the GC logs that the executors reach their memory limits (32g
 per executor, 2 workers per machine) and can't allocate any more stuff in
 the heap. Fwiw, the top 10 in the memory use histogram are:

 num #instances #bytes  class name
 --
1: 24913959511958700560
  scala.collection.immutable.HashMap$HashMap1
2: 251085327 8034730464  scala.Tuple2
3: 243694737 5848673688  java.lang.Float
4: 231198778 5548770672  java.lang.Integer
5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
6:  72191582 2310130624
  scala.collection.immutable.HashMap$HashTrieMap
7:  74114058 1778737392  java.lang.Long
8:   6059103  779203840  [Ljava.lang.Object;
9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
   10: 34749   70122104  [B

 Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

 spark.core.connection.ack.wait.timeout 600
 spark.executor.heartbeatInterval   60s
 spark.executor.memory  32g
 spark.mesos.coarse false
 spark.network.timeout  600s
 spark.shuffle.blockTransferService netty
 spark.shuffle.consolidateFiles true
 spark.shuffle.file.buffer  1m
 spark.shuffle.io.maxRetries6
 spark.shuffle.manager  sort

 The join is currently configured with spark.sql.shuffle.partitions=1000
 but that doesn't seem to help. Would increasing the partitions help ? Is
 there a formula to determine an approximate partitions number value for a
 join ?
 Any help with this job would be appreciated !

 cheers,
 Tom



RE: How to avoid shuffle errors for a large join ?

2015-08-28 Thread java8964
There are several possibilities here.
1) Keep in mind that 7GB data will need way more than 7G heap, as deserialize 
java object needs much more space than data itself. Grand rule is multiple 6 to 
8 times, so 7G data need 50G heap space.2) You should monitor the Spark UI, to 
check how many records being processed by task, and if the failed tasks have 
more data than the rest. Even current you have tasks failed, you will also have 
the tasks succeeded. Compare them, does the failed tasks process way more 
records than the succeeded ones? If so, it indicates you have data skew 
problem.3) If the failed tasks allocated similar records as succeeded ones, 
then you just add more partitions, to make each task processing less data, You 
should always monitor the GC output in these cases.4) If most of your tasks 
failed due to memory, then your setting is too small for your data, adding 
partitions or memory.

Yong

From: tom...@gmail.com
Date: Fri, 28 Aug 2015 13:55:52 -0700
Subject: Re: How to avoid shuffle errors for a large join ?
To: ja...@jasonknight.us
CC: user@spark.apache.org

Yeah, I tried with 10k and 30k and these still failed, will try with more then. 
Though that is a little disappointing, it only writes ~7TB of shuffle data 
which shouldn't in theory require more than 1000 reducers on my 10TB memory 
cluster (~7GB of spill per reducer).
I'm now wondering if my shuffle partitions are uneven and I should use a custom 
partitioner, is there a way to get stats on the partition sizes from Spark ?
On Fri, Aug 28, 2015 at 12:46 PM, Jason ja...@jasonknight.us wrote:
I had similar problems to this (reduce side failures for large joins (25bn rows 
with 9bn)), and found the answer was to further up the 
spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for me, 
but your tables look a little denser, so you may want to go even higher.
On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote:
I'm getting errors like Removing executor with no recent heartbeats  
Missing an output location for shuffle errors for a large SparkSql join (1bn 
rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to configure the job 
to avoid them.

The initial stage completes fine with some 30k tasks on a cluster with 70 
machines/10TB memory, generating about 6.5TB of shuffle writes, but then the 
shuffle stage first waits 30min in the scheduling phase according to the UI, 
and then dies with the mentioned errors.

I can see in the GC logs that the executors reach their memory limits (32g per 
executor, 2 workers per machine) and can't allocate any more stuff in the heap. 
Fwiw, the top 10 in the memory use histogram are:

num #instances #bytes  class 
name--   1: 249139595
11958700560  scala.collection.immutable.HashMap$HashMap1   2: 251085327 
8034730464  scala.Tuple2   3: 243694737 5848673688  java.lang.Float   
4: 231198778 5548770672  java.lang.Integer   5:  72191585 
4298521576  [Lscala.collection.immutable.HashMap;   6:  72191582 
2310130624  scala.collection.immutable.HashMap$HashTrieMap   7:  74114058   
  1778737392  java.lang.Long   8:   6059103  779203840  
[Ljava.lang.Object;   9:   5461096  174755072  
scala.collection.mutable.ArrayBuffer  10: 34749   70122104  [B
Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

spark.core.connection.ack.wait.timeout 600spark.executor.heartbeatInterval  
 60sspark.executor.memory  32gspark.mesos.coarse
 falsespark.network.timeout  
600sspark.shuffle.blockTransferService nettyspark.shuffle.consolidateFiles  
   truespark.shuffle.file.buffer  1mspark.shuffle.io.maxRetries 
   6spark.shuffle.manager  sort
The join is currently configured with spark.sql.shuffle.partitions=1000 but 
that doesn't seem to help. Would increasing the partitions help ? Is there a 
formula to determine an approximate partitions number value for a join ?
Any help with this job would be appreciated !
cheers,Tom


  

Re: How to avoid shuffle errors for a large join ?

2015-08-28 Thread Thomas Dudziak
I'm curious where the factor of 6-8 comes from ? Is this assuming snappy
(or lzf) compression ? The sizes I mentioned are what the Spark UI reports,
not sure if those are before or after compression (for the shuffle
read/write).

On Fri, Aug 28, 2015 at 4:41 PM, java8964 java8...@hotmail.com wrote:

 There are several possibilities here.

 1) Keep in mind that 7GB data will need way more than 7G heap, as
 deserialize java object needs much more space than data itself. Grand rule
 is multiple 6 to 8 times, so 7G data need 50G heap space.
 2) You should monitor the Spark UI, to check how many records being
 processed by task, and if the failed tasks have more data than the rest.
 Even current you have tasks failed, you will also have the tasks succeeded.
 Compare them, does the failed tasks process way more records than the
 succeeded ones? If so, it indicates you have data skew problem.
 3) If the failed tasks allocated similar records as succeeded ones, then
 you just add more partitions, to make each task processing less data, You
 should always monitor the GC output in these cases.
 4) If most of your tasks failed due to memory, then your setting is too
 small for your data, adding partitions or memory.


 Yong

 --
 From: tom...@gmail.com
 Date: Fri, 28 Aug 2015 13:55:52 -0700
 Subject: Re: How to avoid shuffle errors for a large join ?
 To: ja...@jasonknight.us
 CC: user@spark.apache.org


 Yeah, I tried with 10k and 30k and these still failed, will try with more
 then. Though that is a little disappointing, it only writes ~7TB of shuffle
 data which shouldn't in theory require more than 1000 reducers on my 10TB
 memory cluster (~7GB of spill per reducer).
 I'm now wondering if my shuffle partitions are uneven and I should use a
 custom partitioner, is there a way to get stats on the partition sizes from
 Spark ?

 On Fri, Aug 28, 2015 at 12:46 PM, Jason ja...@jasonknight.us wrote:

 I had similar problems to this (reduce side failures for large joins (25bn
 rows with 9bn)), and found the answer was to further up the
 spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
 me, but your tables look a little denser, so you may want to go even higher.

 On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote:

 I'm getting errors like Removing executor with no recent heartbeats 
 Missing an output location for shuffle errors for a large SparkSql join
 (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
 configure the job to avoid them.

 The initial stage completes fine with some 30k tasks on a cluster with 70
 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
 the shuffle stage first waits 30min in the scheduling phase according to
 the UI, and then dies with the mentioned errors.

 I can see in the GC logs that the executors reach their memory limits (32g
 per executor, 2 workers per machine) and can't allocate any more stuff in
 the heap. Fwiw, the top 10 in the memory use histogram are:

 num #instances #bytes  class name
 --
1: 24913959511958700560
  scala.collection.immutable.HashMap$HashMap1
2: 251085327 8034730464  scala.Tuple2
3: 243694737 5848673688  java.lang.Float
4: 231198778 5548770672  java.lang.Integer
5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
6:  72191582 2310130624
  scala.collection.immutable.HashMap$HashTrieMap
7:  74114058 1778737392  java.lang.Long
8:   6059103  779203840  [Ljava.lang.Object;
9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
   10: 34749   70122104  [B

 Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

 spark.core.connection.ack.wait.timeout 600
 spark.executor.heartbeatInterval   60s
 spark.executor.memory  32g
 spark.mesos.coarse false
 spark.network.timeout  600s
 spark.shuffle.blockTransferService netty
 spark.shuffle.consolidateFiles true
 spark.shuffle.file.buffer  1m
 spark.shuffle.io.maxRetries6
 spark.shuffle.manager  sort

 The join is currently configured with spark.sql.shuffle.partitions=1000
 but that doesn't seem to help. Would increasing the partitions help ? Is
 there a formula to determine an approximate partitions number value for a
 join ?
 Any help with this job would be appreciated !

 cheers,
 Tom





Re: How to avoid shuffle errors for a large join ?

2015-08-28 Thread Jason
Ahh yes, thanks for mentioning data skew, I've run into that before as
well. The best way there is to get statistics on the distribution of your
join key. If there are a few values with drastically larger number of
values, then a reducer task will always be swamped no matter how many
reducer side partitions you use.

If this is the problem, then one solution I have used is to do a skew join
manually. Something like:

SELECT * FROM (SELECT * from table WHERE joinkey  'commonval') t1 JOIN t2
ON t1.joinkey == t2.joinkey
UNION ALL
SELECT * FROM (SELECT * from table WHERE joinkey = 'commonval') t1 JOIN t2
ON t1.joinkey == t2.joinkey


On Fri, Aug 28, 2015 at 1:56 PM Thomas Dudziak tom...@gmail.com wrote:

 Yeah, I tried with 10k and 30k and these still failed, will try with more
 then. Though that is a little disappointing, it only writes ~7TB of shuffle
 data which shouldn't in theory require more than 1000 reducers on my 10TB
 memory cluster (~7GB of spill per reducer).
 I'm now wondering if my shuffle partitions are uneven and I should use a
 custom partitioner, is there a way to get stats on the partition sizes from
 Spark ?

 On Fri, Aug 28, 2015 at 12:46 PM, Jason ja...@jasonknight.us wrote:

 I had similar problems to this (reduce side failures for large joins
 (25bn rows with 9bn)), and found the answer was to further up the
 spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
 me, but your tables look a little denser, so you may want to go even higher.

 On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote:

 I'm getting errors like Removing executor with no recent heartbeats 
 Missing an output location for shuffle errors for a large SparkSql join
 (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
 configure the job to avoid them.

 The initial stage completes fine with some 30k tasks on a cluster with
 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
 the shuffle stage first waits 30min in the scheduling phase according to
 the UI, and then dies with the mentioned errors.

 I can see in the GC logs that the executors reach their memory limits
 (32g per executor, 2 workers per machine) and can't allocate any more stuff
 in the heap. Fwiw, the top 10 in the memory use histogram are:

 num #instances #bytes  class name
 --
1: 24913959511958700560
  scala.collection.immutable.HashMap$HashMap1
2: 251085327 8034730464  scala.Tuple2
3: 243694737 5848673688  java.lang.Float
4: 231198778 5548770672  java.lang.Integer
5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
6:  72191582 2310130624
  scala.collection.immutable.HashMap$HashTrieMap
7:  74114058 1778737392  java.lang.Long
8:   6059103  779203840  [Ljava.lang.Object;
9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
   10: 34749   70122104  [B

 Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

 spark.core.connection.ack.wait.timeout 600
 spark.executor.heartbeatInterval   60s
 spark.executor.memory  32g
 spark.mesos.coarse false
 spark.network.timeout  600s
 spark.shuffle.blockTransferService netty
 spark.shuffle.consolidateFiles true
 spark.shuffle.file.buffer  1m
 spark.shuffle.io.maxRetries6
 spark.shuffle.manager  sort

 The join is currently configured with spark.sql.shuffle.partitions=1000
 but that doesn't seem to help. Would increasing the partitions help ? Is
 there a formula to determine an approximate partitions number value for a
 join ?
 Any help with this job would be appreciated !

 cheers,
 Tom





Re: How to avoid shuffle errors for a large join ?

2015-08-28 Thread Thomas Dudziak
Yeah, I tried with 10k and 30k and these still failed, will try with more
then. Though that is a little disappointing, it only writes ~7TB of shuffle
data which shouldn't in theory require more than 1000 reducers on my 10TB
memory cluster (~7GB of spill per reducer).
I'm now wondering if my shuffle partitions are uneven and I should use a
custom partitioner, is there a way to get stats on the partition sizes from
Spark ?

On Fri, Aug 28, 2015 at 12:46 PM, Jason ja...@jasonknight.us wrote:

 I had similar problems to this (reduce side failures for large joins (25bn
 rows with 9bn)), and found the answer was to further up the
 spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
 me, but your tables look a little denser, so you may want to go even higher.

 On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote:

 I'm getting errors like Removing executor with no recent heartbeats 
 Missing an output location for shuffle errors for a large SparkSql join
 (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
 configure the job to avoid them.

 The initial stage completes fine with some 30k tasks on a cluster with 70
 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
 the shuffle stage first waits 30min in the scheduling phase according to
 the UI, and then dies with the mentioned errors.

 I can see in the GC logs that the executors reach their memory limits
 (32g per executor, 2 workers per machine) and can't allocate any more stuff
 in the heap. Fwiw, the top 10 in the memory use histogram are:

 num #instances #bytes  class name
 --
1: 24913959511958700560
  scala.collection.immutable.HashMap$HashMap1
2: 251085327 8034730464  scala.Tuple2
3: 243694737 5848673688  java.lang.Float
4: 231198778 5548770672  java.lang.Integer
5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
6:  72191582 2310130624
  scala.collection.immutable.HashMap$HashTrieMap
7:  74114058 1778737392  java.lang.Long
8:   6059103  779203840  [Ljava.lang.Object;
9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
   10: 34749   70122104  [B

 Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

 spark.core.connection.ack.wait.timeout 600
 spark.executor.heartbeatInterval   60s
 spark.executor.memory  32g
 spark.mesos.coarse false
 spark.network.timeout  600s
 spark.shuffle.blockTransferService netty
 spark.shuffle.consolidateFiles true
 spark.shuffle.file.buffer  1m
 spark.shuffle.io.maxRetries6
 spark.shuffle.manager  sort

 The join is currently configured with spark.sql.shuffle.partitions=1000
 but that doesn't seem to help. Would increasing the partitions help ? Is
 there a formula to determine an approximate partitions number value for a
 join ?
 Any help with this job would be appreciated !

 cheers,
 Tom




Re: How to avoid shuffle errors for a large join ?

2015-08-28 Thread Jason
I had similar problems to this (reduce side failures for large joins (25bn
rows with 9bn)), and found the answer was to further up the
spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
me, but your tables look a little denser, so you may want to go even higher.

On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote:

 I'm getting errors like Removing executor with no recent heartbeats 
 Missing an output location for shuffle errors for a large SparkSql join
 (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
 configure the job to avoid them.

 The initial stage completes fine with some 30k tasks on a cluster with 70
 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
 the shuffle stage first waits 30min in the scheduling phase according to
 the UI, and then dies with the mentioned errors.

 I can see in the GC logs that the executors reach their memory limits (32g
 per executor, 2 workers per machine) and can't allocate any more stuff in
 the heap. Fwiw, the top 10 in the memory use histogram are:

 num #instances #bytes  class name
 --
1: 24913959511958700560
  scala.collection.immutable.HashMap$HashMap1
2: 251085327 8034730464  scala.Tuple2
3: 243694737 5848673688  java.lang.Float
4: 231198778 5548770672  java.lang.Integer
5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
6:  72191582 2310130624
  scala.collection.immutable.HashMap$HashTrieMap
7:  74114058 1778737392  java.lang.Long
8:   6059103  779203840  [Ljava.lang.Object;
9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
   10: 34749   70122104  [B

 Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

 spark.core.connection.ack.wait.timeout 600
 spark.executor.heartbeatInterval   60s
 spark.executor.memory  32g
 spark.mesos.coarse false
 spark.network.timeout  600s
 spark.shuffle.blockTransferService netty
 spark.shuffle.consolidateFiles true
 spark.shuffle.file.buffer  1m
 spark.shuffle.io.maxRetries6
 spark.shuffle.manager  sort

 The join is currently configured with spark.sql.shuffle.partitions=1000
 but that doesn't seem to help. Would increasing the partitions help ? Is
 there a formula to determine an approximate partitions number value for a
 join ?
 Any help with this job would be appreciated !

 cheers,
 Tom



How to avoid shuffle errors for a large join ?

2015-08-27 Thread Thomas Dudziak
I'm getting errors like Removing executor with no recent heartbeats 
Missing an output location for shuffle errors for a large SparkSql join
(1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
configure the job to avoid them.

The initial stage completes fine with some 30k tasks on a cluster with 70
machines/10TB memory, generating about 6.5TB of shuffle writes, but then
the shuffle stage first waits 30min in the scheduling phase according to
the UI, and then dies with the mentioned errors.

I can see in the GC logs that the executors reach their memory limits (32g
per executor, 2 workers per machine) and can't allocate any more stuff in
the heap. Fwiw, the top 10 in the memory use histogram are:

num #instances #bytes  class name
--
   1: 24913959511958700560
 scala.collection.immutable.HashMap$HashMap1
   2: 251085327 8034730464  scala.Tuple2
   3: 243694737 5848673688  java.lang.Float
   4: 231198778 5548770672  java.lang.Integer
   5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
   6:  72191582 2310130624
 scala.collection.immutable.HashMap$HashTrieMap
   7:  74114058 1778737392  java.lang.Long
   8:   6059103  779203840  [Ljava.lang.Object;
   9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
  10: 34749   70122104  [B

Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

spark.core.connection.ack.wait.timeout 600
spark.executor.heartbeatInterval   60s
spark.executor.memory  32g
spark.mesos.coarse false
spark.network.timeout  600s
spark.shuffle.blockTransferService netty
spark.shuffle.consolidateFiles true
spark.shuffle.file.buffer  1m
spark.shuffle.io.maxRetries6
spark.shuffle.manager  sort

The join is currently configured with spark.sql.shuffle.partitions=1000 but
that doesn't seem to help. Would increasing the partitions help ? Is there
a formula to determine an approximate partitions number value for a join ?
Any help with this job would be appreciated !

cheers,
Tom