Re: Parquet 'bucketBy' creates a ton of files

2019-07-10 Thread Silvio Fiorito
It really depends on the use case. Bucketing is storing the data already 
hash-partitioned. So, if you frequently perform aggregations or joins on the 
bucketing column(s) then it can save you a shuffle. You need to keep in mind 
that for joins to completely avoid a shuffle both tables would need to have the 
same bucketing.

Sorting the data may help with filtering assuming you’re using a file format 
like Parquet (e.g. if you frequently filter by account id). If you look at 
slide 11 in this talk I gave at Summit you can see a simple example: 
https://www.slideshare.net/databricks/lessons-from-the-field-episode-ii-applying-best-practices-to-your-apache-spark-applications-with-silvio-fiorito



From: Gourav Sengupta 
Date: Wednesday, July 10, 2019 at 3:14 AM
To: Silvio Fiorito 
Cc: Arwin Tio , "user@spark.apache.org" 

Subject: Re: Parquet 'bucketBy' creates a ton of files

yeah makes sense, also is there any massive performance improvement using 
bucketBy in comparison to sorting?

Regards,
Gourav

On Thu, Jul 4, 2019 at 1:34 PM Silvio Fiorito 
mailto:silvio.fior...@granturing.com>> wrote:
You need to first repartition (at a minimum by bucketColumn1) since each task 
will write out the buckets/files. If the bucket keys are distributed randomly 
across the RDD partitions, then you will get multiple files per bucket.

From: Arwin Tio mailto:arwin@hotmail.com>>
Date: Thursday, July 4, 2019 at 3:22 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Parquet 'bucketBy' creates a ton of files

I am trying to use Spark's **bucketBy** feature on a pretty large dataset.

```java
dataframe.write()
.format("parquet")
.bucketBy(500, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");
```

The problem is that my Spark cluster has about 500 partitions/tasks/executors 
(not sure the terminology), so I end up with files that look like:

```
part-1-{UUID}_1.c000.snappy.parquet
part-1-{UUID}_2.c000.snappy.parquet
...
part-1-{UUID}_00500.c000.snappy.parquet

part-2-{UUID}_1.c000.snappy.parquet
part-2-{UUID}_2.c000.snappy.parquet
...
part-2-{UUID}_00500.c000.snappy.parquet

part-00500-{UUID}_1.c000.snappy.parquet
part-00500-{UUID}_2.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet
```

That's 500x500=25 bucketed parquet files! It takes forever for the 
`FileOutputCommitter` to commit that to S3.

Is there a way to generate **one file per bucket**, like in Hive? Or is there a 
better way to deal with this problem? As of now it seems like I have to choose 
between lowering the parallelism of my cluster (reduce number of writers) or 
reducing the parallelism of my parquet files (reduce number of buckets), which 
will lower the parallelism of my downstream jobs.

Thanks


Re: Parquet 'bucketBy' creates a ton of files

2019-07-10 Thread Gourav Sengupta
yeah makes sense, also is there any massive performance improvement using
bucketBy in comparison to sorting?

Regards,
Gourav

On Thu, Jul 4, 2019 at 1:34 PM Silvio Fiorito 
wrote:

> You need to first repartition (at a minimum by bucketColumn1) since each
> task will write out the buckets/files. If the bucket keys are distributed
> randomly across the RDD partitions, then you will get multiple files per
> bucket.
>
>
>
> *From: *Arwin Tio 
> *Date: *Thursday, July 4, 2019 at 3:22 AM
> *To: *"user@spark.apache.org" 
> *Subject: *Parquet 'bucketBy' creates a ton of files
>
>
>
> I am trying to use Spark's **bucketBy** feature on a pretty large dataset.
>
>
>
> ```java
>
> dataframe.write()
>
> .format("parquet")
>
> .bucketBy(500, bucketColumn1, bucketColumn2)
>
> .mode(SaveMode.Overwrite)
>
> .option("path", "s3://my-bucket")
>
> .saveAsTable("my_table");
>
> ```
>
>
>
> The problem is that my Spark cluster has about 500
> partitions/tasks/executors (not sure the terminology), so I end up with
> files that look like:
>
>
>
> ```
>
> part-1-{UUID}_1.c000.snappy.parquet
>
> part-1-{UUID}_2.c000.snappy.parquet
>
> ...
>
> part-1-{UUID}_00500.c000.snappy.parquet
>
>
>
> part-2-{UUID}_1.c000.snappy.parquet
>
> part-2-{UUID}_2.c000.snappy.parquet
>
> ...
>
> part-2-{UUID}_00500.c000.snappy.parquet
>
>
>
> part-00500-{UUID}_1.c000.snappy.parquet
>
> part-00500-{UUID}_2.c000.snappy.parquet
>
> ...
>
> part-00500-{UUID}_00500.c000.snappy.parquet
>
> ```
>
>
>
> That's 500x500=25 bucketed parquet files! It takes forever for the
> `FileOutputCommitter` to commit that to S3.
>
>
>
> Is there a way to generate **one file per bucket**, like in Hive? Or is
> there a better way to deal with this problem? As of now it seems like I
> have to choose between lowering the parallelism of my cluster (reduce
> number of writers) or reducing the parallelism of my parquet files (reduce
> number of buckets), which will lower the parallelism of my downstream jobs.
>
>
>
> Thanks
>


Re: Parquet 'bucketBy' creates a ton of files

2019-07-04 Thread Silvio Fiorito
You need to first repartition (at a minimum by bucketColumn1) since each task 
will write out the buckets/files. If the bucket keys are distributed randomly 
across the RDD partitions, then you will get multiple files per bucket.

From: Arwin Tio 
Date: Thursday, July 4, 2019 at 3:22 AM
To: "user@spark.apache.org" 
Subject: Parquet 'bucketBy' creates a ton of files

I am trying to use Spark's **bucketBy** feature on a pretty large dataset.

```java
dataframe.write()
.format("parquet")
.bucketBy(500, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");
```

The problem is that my Spark cluster has about 500 partitions/tasks/executors 
(not sure the terminology), so I end up with files that look like:

```
part-1-{UUID}_1.c000.snappy.parquet
part-1-{UUID}_2.c000.snappy.parquet
...
part-1-{UUID}_00500.c000.snappy.parquet

part-2-{UUID}_1.c000.snappy.parquet
part-2-{UUID}_2.c000.snappy.parquet
...
part-2-{UUID}_00500.c000.snappy.parquet

part-00500-{UUID}_1.c000.snappy.parquet
part-00500-{UUID}_2.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet
```

That's 500x500=25 bucketed parquet files! It takes forever for the 
`FileOutputCommitter` to commit that to S3.

Is there a way to generate **one file per bucket**, like in Hive? Or is there a 
better way to deal with this problem? As of now it seems like I have to choose 
between lowering the parallelism of my cluster (reduce number of writers) or 
reducing the parallelism of my parquet files (reduce number of buckets), which 
will lower the parallelism of my downstream jobs.

Thanks


Re: Parquet 'bucketBy' creates a ton of files

2019-07-04 Thread Phillip Henry
Hi, Arwin.

If I understand you correctly, this is totally expected behaviour.

I don't know much about saving to S3 but maybe you could write to HDFS
first then copy everything to S3? I think the write to HDFS will probably
be much faster as Spark/HDFS will write locally or to a machine on the same
LAN. After writing to HDFS, you can then iterate over the resulting
sub-directories (representing each bucket) and coalesce the files in them.

Regards,

Phillip




On Thu, Jul 4, 2019 at 8:22 AM Arwin Tio  wrote:

> I am trying to use Spark's **bucketBy** feature on a pretty large dataset.
>
> ```java
> dataframe.write()
> .format("parquet")
> .bucketBy(500, bucketColumn1, bucketColumn2)
> .mode(SaveMode.Overwrite)
> .option("path", "s3://my-bucket")
> .saveAsTable("my_table");
> ```
>
> The problem is that my Spark cluster has about 500
> partitions/tasks/executors (not sure the terminology), so I end up with
> files that look like:
>
> ```
> part-1-{UUID}_1.c000.snappy.parquet
> part-1-{UUID}_2.c000.snappy.parquet
> ...
> part-1-{UUID}_00500.c000.snappy.parquet
>
> part-2-{UUID}_1.c000.snappy.parquet
> part-2-{UUID}_2.c000.snappy.parquet
> ...
> part-2-{UUID}_00500.c000.snappy.parquet
>
> part-00500-{UUID}_1.c000.snappy.parquet
> part-00500-{UUID}_2.c000.snappy.parquet
> ...
> part-00500-{UUID}_00500.c000.snappy.parquet
> ```
>
> That's 500x500=25 bucketed parquet files! It takes forever for the
> `FileOutputCommitter` to commit that to S3.
>
> Is there a way to generate **one file per bucket**, like in Hive? Or is
> there a better way to deal with this problem? As of now it seems like I
> have to choose between lowering the parallelism of my cluster (reduce
> number of writers) or reducing the parallelism of my parquet files (reduce
> number of buckets), which will lower the parallelism of my downstream jobs.
>
> Thanks
>