Re: coalesce ending up very unbalanced - but why?

2016-12-16 Thread vaquar khan
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html

Regards,
vaquar khan

On Wed, Dec 14, 2016 at 12:15 PM, Vaibhav Sinha 
wrote:

> Hi,
> I see a similar behaviour in an exactly similar scenario at my deployment
> as well. I am using scala, so the behaviour is not limited to pyspark.
> In my observation 9 out of 10 partitions (as in my case) are of similar
> size ~38 GB each and final one is significantly larger ~59 GB.
> Prime number of partitions is an interesting approach I will try that out.
>
> Best,
> Vaibhav.
>
> On 14 Dec 2016, 10:18 PM +0530, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com>, wrote:
>
> Hello,
> We have done some test in here, and it seems that when we use prime number
> of partitions the data is more spread.
> This has to be with the hashpartitioning and the Java Hash algorithm.
> I don't know how your data is and how is this in python, but if you (can)
> implement a partitioner, or change it from default, you will get a better
> result.
>
> Dirceu
>
> 2016-12-14 12:41 GMT-02:00 Adrian Bridgett :
>
>> Since it's pyspark it's just using the default hash partitioning I
>> believe.  Trying a prime number (71 so that there's enough CPUs) doesn't
>> seem to change anything.  Out of curiousity why did you suggest that?
>> Googling "spark coalesce prime" doesn't give me any clue :-)
>> Adrian
>>
>>
>> On 14/12/2016 13:58, Dirceu Semighini Filho wrote:
>>
>> Hi Adrian,
>> Which kind of partitioning are you using?
>> Have you already tried to coalesce it to a prime number?
>>
>>
>> 2016-12-14 11:56 GMT-02:00 Adrian Bridgett :
>>
>>> I realise that coalesce() isn't guaranteed to be balanced and adding a
>>> repartition() does indeed fix this (at the cost of a large shuffle.
>>>
>>> I'm trying to understand _why_ it's so uneven (hopefully it helps
>>> someone else too).   This is using spark v2.0.2 (pyspark).
>>>
>>> Essentially we're just reading CSVs into a DataFrame (which we persist
>>> serialised for some calculations), then writing it back out as PRQ.  To
>>> avoid too many PRQ files I've set a coalesce of 72 (9 boxes, 8 CPUs each).
>>>
>>> The writers end up with about 700-900MB each (not bad).  Except for one
>>> which is at 6GB before I killed it.
>>>
>>> Input data is 12000 gzipped CSV files in S3 (approx 30GB), named like
>>> this, almost all about 2MB each:
>>> s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587209
>>> -i-da71c942-389.gz
>>> s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587529
>>> -i-01d3dab021b760d29-334.gz
>>>
>>> (we're aware that this isn't an ideal naming convention from an S3
>>> performance PoV).
>>>
>>> The actual CSV file format is:
>>> UUID\tINT\tINT\... . (wide rows - about 300 columns)
>>>
>>> e.g.:
>>> 17f9c2a7-ddf6-42d3-bada-63b845cb33a51481587198750   11213
>>> 1d723493-5341-450d-a506-5c96ce0697f01481587198751   11212 ...
>>> 64cec96f-732c-44b8-a02e-098d5b63ad771481587198752   11211 ...
>>>
>>> The dataframe seems to be stored evenly on all the nodes (according to
>>> the storage tab) and all the blocks are the same size.   Most of the tasks
>>> are executed at NODE_LOCAL locality (although there are a few ANY).  The
>>> oversized task is NODE_LOCAL though.
>>>
>>> The reading and calculations all seem evenly spread, confused why the
>>> writes aren't as I'd expect the input partitions to be even, what's causing
>>> and what we can do?  Maybe it's possible for coalesce() to be a bit smarter
>>> in terms of which partitions it coalesces - balancing the size of the final
>>> partitions rather than the number of source partitions in each final
>>> partition.
>>>
>>> Thanks for any light you can shine!
>>>
>>> Adrian
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
>> 
>> _
>> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
>> Phone #: +44 777-377-8251 <+44%20777-377-8251>
>> Skype: abridgett  |  @adrianbridgett 
>>   |  LinkedIn link  
>> _
>>
>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783

IT Architect / Lead Consultant
Greater Chicago


Re: coalesce ending up very unbalanced - but why?

2016-12-14 Thread Vaibhav Sinha
Hi,
I see a similar behaviour in an exactly similar scenario at my deployment as 
well. I am using scala, so the behaviour is not limited to pyspark.
In my observation 9 out of 10 partitions (as in my case) are of similar size 
~38 GB each and final one is significantly larger ~59 GB.
Prime number of partitions is an interesting approach I will try that out.

Best,
Vaibhav.

On 14 Dec 2016, 10:18 PM +0530, Dirceu Semighini Filho 
, wrote:
> Hello,
> We have done some test in here, and it seems that when we use prime number of 
> partitions the data is more spread.
> This has to be with the hashpartitioning and the Java Hash algorithm.
> I don't know how your data is and how is this in python, but if you (can) 
> implement a partitioner, or change it from default, you will get a better 
> result.
>
> Dirceu
>
> 2016-12-14 12:41 GMT-02:00 Adrian Bridgett  (mailto:adr...@opensignal.com)>:
> >
> > Since it's pyspark it's just using the default hash partitioning I believe. 
> > Trying a prime number (71 so that there's enough CPUs) doesn't seem to 
> > change anything. Out of curiousity why did you suggest that? Googling 
> > "spark coalesce prime" doesn't give me any clue :-)
> >
> >
> > Adrian
> >
> >
> > On 14/12/2016 13:58, Dirceu Semighini Filho wrote:
> > > Hi Adrian,
> > > Which kind of partitioning are you using?
> > > Have you already tried to coalesce it to a prime number?
> > >
> > >
> > > 2016-12-14 11:56 GMT-02:00 Adrian Bridgett  > > (mailto:adr...@opensignal.com)>:
> > > > I realise that coalesce() isn't guaranteed to be balanced and adding a 
> > > > repartition() does indeed fix this (at the cost of a large shuffle.
> > > >
> > > > I'm trying to understand _why_ it's so uneven (hopefully it helps 
> > > > someone else too). This is using spark v2.0.2 (pyspark).
> > > >
> > > > Essentially we're just reading CSVs into a DataFrame (which we persist 
> > > > serialised for some calculations), then writing it back out as PRQ. To 
> > > > avoid too many PRQ files I've set a coalesce of 72 (9 boxes, 8 CPUs 
> > > > each).
> > > >
> > > > The writers end up with about 700-900MB each (not bad). Except for one 
> > > > which is at 6GB before I killed it.
> > > >
> > > > Input data is 12000 gzipped CSV files in S3 (approx 30GB), named like 
> > > > this, almost all about 2MB each:
> > > > s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587209-i-da71c942-389.gz
> > > > s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587529-i-01d3dab021b760d29-334.gz
> > > >
> > > > (we're aware that this isn't an ideal naming convention from an S3 
> > > > performance PoV).
> > > >
> > > > The actual CSV file format is:
> > > > UUID\tINT\tINT\... . (wide rows - about 300 columns)
> > > >
> > > > e.g.:
> > > > 17f9c2a7-ddf6-42d3-bada-63b845cb33a5 1481587198750 (tel:1481587198750) 
> > > > 11213
> > > > 1d723493-5341-450d-a506-5c96ce0697f0 1481587198751 (tel:1481587198751) 
> > > > 11212 ...
> > > > 64cec96f-732c-44b8-a02e-098d5b63ad77 1481587198752 (tel:1481587198752) 
> > > > 11211 ...
> > > >
> > > > The dataframe seems to be stored evenly on all the nodes (according to 
> > > > the storage tab) and all the blocks are the same size. Most of the 
> > > > tasks are executed at NODE_LOCAL locality (although there are a few 
> > > > ANY). The oversized task is NODE_LOCAL though.
> > > >
> > > > The reading and calculations all seem evenly spread, confused why the 
> > > > writes aren't as I'd expect the input partitions to be even, what's 
> > > > causing and what we can do? Maybe it's possible for coalesce() to be a 
> > > > bit smarter in terms of which partitions it coalesces - balancing the 
> > > > size of the final partitions rather than the number of source 
> > > > partitions in each final partition.
> > > >
> > > > Thanks for any light you can shine!
> > > >
> > > > Adrian
> > > >
> > > > -
> > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> > > > (mailto:user-unsubscr...@spark.apache.org)
> > > >
> > >
> >
> > --
> > Adrian Bridgett | Sysadmin Engineer, OpenSignal (http://www.opensignal.com)
> > _
> >
> > Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
> > Phone #: +44 777-377-8251 (tel:+44%20777-377-8251)
> >
> > Skype: abridgett | @adrianbridgett (http://twitter.com/adrianbridgett) | 
> > LinkedIn link (https://uk.linkedin.com/in/abridgett)
> >
> > _
> >
>
>
>


Re: coalesce ending up very unbalanced - but why?

2016-12-14 Thread Dirceu Semighini Filho
Hello,
We have done some test in here, and it seems that when we use prime number
of partitions the data is more spread.
This has to be with the hashpartitioning and the Java Hash algorithm.
I don't know how your data is and how is this in python, but if you (can)
implement a partitioner, or change it from default, you will get a better
result.

Dirceu

2016-12-14 12:41 GMT-02:00 Adrian Bridgett :

> Since it's pyspark it's just using the default hash partitioning I
> believe.  Trying a prime number (71 so that there's enough CPUs) doesn't
> seem to change anything.  Out of curiousity why did you suggest that?
> Googling "spark coalesce prime" doesn't give me any clue :-)
> Adrian
>
>
> On 14/12/2016 13:58, Dirceu Semighini Filho wrote:
>
> Hi Adrian,
> Which kind of partitioning are you using?
> Have you already tried to coalesce it to a prime number?
>
>
> 2016-12-14 11:56 GMT-02:00 Adrian Bridgett :
>
>> I realise that coalesce() isn't guaranteed to be balanced and adding a
>> repartition() does indeed fix this (at the cost of a large shuffle.
>>
>> I'm trying to understand _why_ it's so uneven (hopefully it helps someone
>> else too).   This is using spark v2.0.2 (pyspark).
>>
>> Essentially we're just reading CSVs into a DataFrame (which we persist
>> serialised for some calculations), then writing it back out as PRQ.  To
>> avoid too many PRQ files I've set a coalesce of 72 (9 boxes, 8 CPUs each).
>>
>> The writers end up with about 700-900MB each (not bad).  Except for one
>> which is at 6GB before I killed it.
>>
>> Input data is 12000 gzipped CSV files in S3 (approx 30GB), named like
>> this, almost all about 2MB each:
>> s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587209
>> -i-da71c942-389.gz
>> s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587529
>> -i-01d3dab021b760d29-334.gz
>>
>> (we're aware that this isn't an ideal naming convention from an S3
>> performance PoV).
>>
>> The actual CSV file format is:
>> UUID\tINT\tINT\... . (wide rows - about 300 columns)
>>
>> e.g.:
>> 17f9c2a7-ddf6-42d3-bada-63b845cb33a51481587198750   11213
>> 1d723493-5341-450d-a506-5c96ce0697f01481587198751   11212 ...
>> 64cec96f-732c-44b8-a02e-098d5b63ad771481587198752   11211 ...
>>
>> The dataframe seems to be stored evenly on all the nodes (according to
>> the storage tab) and all the blocks are the same size.   Most of the tasks
>> are executed at NODE_LOCAL locality (although there are a few ANY).  The
>> oversized task is NODE_LOCAL though.
>>
>> The reading and calculations all seem evenly spread, confused why the
>> writes aren't as I'd expect the input partitions to be even, what's causing
>> and what we can do?  Maybe it's possible for coalesce() to be a bit smarter
>> in terms of which partitions it coalesces - balancing the size of the final
>> partitions rather than the number of source partitions in each final
>> partition.
>>
>> Thanks for any light you can shine!
>>
>> Adrian
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
> 
> _
> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
> Phone #: +44 777-377-8251
> Skype: abridgett  |  @adrianbridgett   |
>  LinkedIn link  
> _
>


Re: coalesce ending up very unbalanced - but why?

2016-12-14 Thread Adrian Bridgett
Since it's pyspark it's just using the default hash partitioning I 
believe.  Trying a prime number (71 so that there's enough CPUs) doesn't 
seem to change anything.  Out of curiousity why did you suggest that?  
Googling "spark coalesce prime" doesn't give me any clue :-)


Adrian

On 14/12/2016 13:58, Dirceu Semighini Filho wrote:

Hi Adrian,
Which kind of partitioning are you using?
Have you already tried to coalesce it to a prime number?


2016-12-14 11:56 GMT-02:00 Adrian Bridgett >:


I realise that coalesce() isn't guaranteed to be balanced and
adding a repartition() does indeed fix this (at the cost of a
large shuffle.

I'm trying to understand _why_ it's so uneven (hopefully it helps
someone else too).   This is using spark v2.0.2 (pyspark).

Essentially we're just reading CSVs into a DataFrame (which we
persist serialised for some calculations), then writing it back
out as PRQ.  To avoid too many PRQ files I've set a coalesce of 72
(9 boxes, 8 CPUs each).

The writers end up with about 700-900MB each (not bad). Except for
one which is at 6GB before I killed it.

Input data is 12000 gzipped CSV files in S3 (approx 30GB), named
like this, almost all about 2MB each:

s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587209-i-da71c942-389.gz

s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587529-i-01d3dab021b760d29-334.gz

(we're aware that this isn't an ideal naming convention from an S3
performance PoV).

The actual CSV file format is:
UUID\tINT\tINT\... . (wide rows - about 300 columns)

e.g.:
17f9c2a7-ddf6-42d3-bada-63b845cb33a51481587198750  11213
1d723493-5341-450d-a506-5c96ce0697f01481587198751  11212 ...
64cec96f-732c-44b8-a02e-098d5b63ad771481587198752  11211 ...

The dataframe seems to be stored evenly on all the nodes
(according to the storage tab) and all the blocks are the same
size.   Most of the tasks are executed at NODE_LOCAL locality
(although there are a few ANY).  The oversized task is NODE_LOCAL
though.

The reading and calculations all seem evenly spread, confused why
the writes aren't as I'd expect the input partitions to be even,
what's causing and what we can do? Maybe it's possible for
coalesce() to be a bit smarter in terms of which partitions it
coalesces - balancing the size of the final partitions rather than
the number of source partitions in each final partition.

Thanks for any light you can shine!

Adrian

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





--
*Adrian Bridgett* |  Sysadmin Engineer, OpenSignal 


_
Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
Phone #: +44 777-377-8251
Skype: abridgett  |@adrianbridgett | 
LinkedIn link 

_


Re: coalesce ending up very unbalanced - but why?

2016-12-14 Thread Dirceu Semighini Filho
Hi Adrian,
Which kind of partitioning are you using?
Have you already tried to coalesce it to a prime number?


2016-12-14 11:56 GMT-02:00 Adrian Bridgett :

> I realise that coalesce() isn't guaranteed to be balanced and adding a
> repartition() does indeed fix this (at the cost of a large shuffle.
>
> I'm trying to understand _why_ it's so uneven (hopefully it helps someone
> else too).   This is using spark v2.0.2 (pyspark).
>
> Essentially we're just reading CSVs into a DataFrame (which we persist
> serialised for some calculations), then writing it back out as PRQ.  To
> avoid too many PRQ files I've set a coalesce of 72 (9 boxes, 8 CPUs each).
>
> The writers end up with about 700-900MB each (not bad).  Except for one
> which is at 6GB before I killed it.
>
> Input data is 12000 gzipped CSV files in S3 (approx 30GB), named like
> this, almost all about 2MB each:
> s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587209
> -i-da71c942-389.gz
> s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587529
> -i-01d3dab021b760d29-334.gz
>
> (we're aware that this isn't an ideal naming convention from an S3
> performance PoV).
>
> The actual CSV file format is:
> UUID\tINT\tINT\... . (wide rows - about 300 columns)
>
> e.g.:
> 17f9c2a7-ddf6-42d3-bada-63b845cb33a51481587198750   11213
> 1d723493-5341-450d-a506-5c96ce0697f01481587198751   11212 ...
> 64cec96f-732c-44b8-a02e-098d5b63ad771481587198752   11211 ...
>
> The dataframe seems to be stored evenly on all the nodes (according to the
> storage tab) and all the blocks are the same size.   Most of the tasks are
> executed at NODE_LOCAL locality (although there are a few ANY).  The
> oversized task is NODE_LOCAL though.
>
> The reading and calculations all seem evenly spread, confused why the
> writes aren't as I'd expect the input partitions to be even, what's causing
> and what we can do?  Maybe it's possible for coalesce() to be a bit smarter
> in terms of which partitions it coalesces - balancing the size of the final
> partitions rather than the number of source partitions in each final
> partition.
>
> Thanks for any light you can shine!
>
> Adrian
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>