Since it's pyspark it's just using the default hash partitioning I believe. Trying a prime number (71 so that there's enough CPUs) doesn't seem to change anything. Out of curiousity why did you suggest that? Googling "spark coalesce prime" doesn't give me any clue :-)

Adrian

On 14/12/2016 13:58, Dirceu Semighini Filho wrote:
Hi Adrian,
Which kind of partitioning are you using?
Have you already tried to coalesce it to a prime number?


2016-12-14 11:56 GMT-02:00 Adrian Bridgett <adr...@opensignal.com <mailto:adr...@opensignal.com>>:

    I realise that coalesce() isn't guaranteed to be balanced and
    adding a repartition() does indeed fix this (at the cost of a
    large shuffle.

    I'm trying to understand _why_ it's so uneven (hopefully it helps
    someone else too).   This is using spark v2.0.2 (pyspark).

    Essentially we're just reading CSVs into a DataFrame (which we
    persist serialised for some calculations), then writing it back
    out as PRQ.  To avoid too many PRQ files I've set a coalesce of 72
    (9 boxes, 8 CPUs each).

    The writers end up with about 700-900MB each (not bad). Except for
    one which is at 6GB before I killed it.

    Input data is 12000 gzipped CSV files in S3 (approx 30GB), named
    like this, almost all about 2MB each:
    
s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587209-i-da71c942-389.gz
    
s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587529-i-01d3dab021b760d29-334.gz

    (we're aware that this isn't an ideal naming convention from an S3
    performance PoV).

    The actual CSV file format is:
    UUID\tINT\tINT\... . (wide rows - about 300 columns)

    e.g.:
    17f9c2a7-ddf6-42d3-bada-63b845cb33a5    1481587198750  11213....
    1d723493-5341-450d-a506-5c96ce0697f0    1481587198751  11212 ...
    64cec96f-732c-44b8-a02e-098d5b63ad77    1481587198752  11211 ...

    The dataframe seems to be stored evenly on all the nodes
    (according to the storage tab) and all the blocks are the same
    size.   Most of the tasks are executed at NODE_LOCAL locality
    (although there are a few ANY).  The oversized task is NODE_LOCAL
    though.

    The reading and calculations all seem evenly spread, confused why
    the writes aren't as I'd expect the input partitions to be even,
    what's causing and what we can do? Maybe it's possible for
    coalesce() to be a bit smarter in terms of which partitions it
    coalesces - balancing the size of the final partitions rather than
    the number of source partitions in each final partition.

    Thanks for any light you can shine!

    Adrian

    ---------------------------------------------------------------------
    To unsubscribe e-mail: user-unsubscr...@spark.apache.org
    <mailto:user-unsubscr...@spark.apache.org>



--
*Adrian Bridgett* | Sysadmin Engineer, OpenSignal <http://www.opensignal.com>
_____________________________________________________
Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
Phone #: +44 777-377-8251
Skype: abridgett |@adrianbridgett <http://twitter.com/adrianbridgett>| LinkedIn link <https://uk.linkedin.com/in/abridgett>
_____________________________________________________

Reply via email to