Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Chris Teoh
If you're using Spark SQL, that configuration setting causes a shuffle if
the number of your input partitions to the write is larger than that
configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a
shuffle? I don't expect a shuffle if it is a straight write. What's the
input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:

> Could you explain why shuffle partitions might be a good starting point?
>
> Some more details: when I write the output the first time after logic is
> complete, I repartition the files to 20 after having
> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
> Data is small about 130MB per file. When spark reads it reads in 40
> partitions and tries to output that to the different cluster. Unfortunately
> during that read and write stage executors drop off.
>
> We keep hdfs block 128Mb
>
> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:
>
>> spark.sql.shuffle.partitions might be a start.
>>
>> Is there a difference in the number of partitions when the parquet is
>> read to spark.sql.shuffle.partitions? Is it much higher than
>> spark.sql.shuffle.partitions?
>>
>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>>
>>> Hi all,
>>>
>>> I have encountered a strange executor OOM error. I have a data pipeline
>>> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
>>> location as parquet then reads the files back in and writes to multiple
>>> hadoop clusters (all co-located in the same datacenter).  It should be a
>>> very simple task, but executors are being killed off exceeding container
>>> thresholds. From logs, it is exceeding given memory (using Mesos as the
>>> cluster manager).
>>>
>>> The ETL process works perfectly fine with the given resources, doing
>>> joins and adding columns. The output is written successfully the first
>>> time. *Only when the pipeline at the end reads the output from HDFS and
>>> writes it to different HDFS cluster paths does it fail.* (It does a
>>> spark.read.parquet(source).write.parquet(dest))
>>>
>>> This doesn't really make sense and I'm wondering what configurations I
>>> should start looking at.
>>>
>>> --
>>> Cheers,
>>> Ruijing Li
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Ruijing Li
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is
complete, I repartition the files to 20 after having
spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
Data is small about 130MB per file. When spark reads it reads in 40
partitions and tries to output that to the different cluster. Unfortunately
during that read and write stage executors drop off.

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:

> spark.sql.shuffle.partitions might be a start.
>
> Is there a difference in the number of partitions when the parquet is read
> to spark.sql.shuffle.partitions? Is it much higher than
> spark.sql.shuffle.partitions?
>
> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>
>> Hi all,
>>
>> I have encountered a strange executor OOM error. I have a data pipeline
>> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
>> location as parquet then reads the files back in and writes to multiple
>> hadoop clusters (all co-located in the same datacenter).  It should be a
>> very simple task, but executors are being killed off exceeding container
>> thresholds. From logs, it is exceeding given memory (using Mesos as the
>> cluster manager).
>>
>> The ETL process works perfectly fine with the given resources, doing
>> joins and adding columns. The output is written successfully the first
>> time. *Only when the pipeline at the end reads the output from HDFS and
>> writes it to different HDFS cluster paths does it fail.* (It does a
>> spark.read.parquet(source).write.parquet(dest))
>>
>> This doesn't really make sense and I'm wondering what configurations I
>> should start looking at.
>>
>> --
>> Cheers,
>> Ruijing Li
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Chris Teoh
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read
to spark.sql.shuffle.partitions? Is it much higher than
spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:

> Hi all,
>
> I have encountered a strange executor OOM error. I have a data pipeline
> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
> location as parquet then reads the files back in and writes to multiple
> hadoop clusters (all co-located in the same datacenter).  It should be a
> very simple task, but executors are being killed off exceeding container
> thresholds. From logs, it is exceeding given memory (using Mesos as the
> cluster manager).
>
> The ETL process works perfectly fine with the given resources, doing joins
> and adding columns. The output is written successfully the first time. *Only
> when the pipeline at the end reads the output from HDFS and writes it to
> different HDFS cluster paths does it fail.* (It does a
> spark.read.parquet(source).write.parquet(dest))
>
> This doesn't really make sense and I'm wondering what configurations I
> should start looking at.
>
> --
> Cheers,
> Ruijing Li
> --
> Cheers,
> Ruijing Li
>


Re: Identify bottleneck

2019-12-20 Thread Nicolas Paris
apparently the "withColumn" issue only apply for hundred or thousand of
calls. This was not the case here (twenty calls)

On Fri, Dec 20, 2019 at 08:53:16AM +0100, Enrico Minack wrote:
> The issue is explained in depth here: https://medium.com/@manuzhang/
> the-hidden-cost-of-spark-withcolumn-8ffea517c015
> 
> Am 19.12.19 um 23:33 schrieb Chris Teoh:
> 
> As far as I'm aware it isn't any better. The logic all gets processed by
> the same engine so to confirm, compare the DAGs generated from both
> approaches and see if they're identical.
> 
> On Fri, 20 Dec 2019, 8:56 am ayan guha,  wrote:
> 
> Quick question: Why is it better to use one sql vs multiple 
> withColumn?
> isnt everything eventually rewritten by catalyst?
> 
> On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack 
> wrote:
> 
> How many withColumn statements do you have? Note that it is better
> to use a single select, rather than lots of withColumn. This also
> makes drops redundant.
> 
> Reading 25m CSV lines and writing to Parquet in 5 minutes on 32
> cores is really slow. Can you try this on a single machine, i.e.
> run wit "local[*]".
> 
> Can you rule out the writing part by counting the rows? I presume
> this all happens in a single stage.
> 
> Enrico
> 
> 
> Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
> 
> Hello
> 
> I'm working on an ETL based on csv describing file systems to
> transform it into parquet so I can work on them easily to
> extract informations.
> I'm using Mr. Powers framework Daria to do so. I've quiet
> different input and a lot of transformation and the framework
> helps organize the code.
> I have a stand-alone cluster v2.3.2 composed of 4 node with 8
> cores and 32GB of memory each.
> The storage is handle by a CephFS volume mounted on all nodes.
> First a small description of my algorithm (it's quiet simple):
> 
> 
> Use SparkContext to load the csv.bz2 file,
> Chain a lot of withColumn() statement,
> Drop all unnecessary columns,
> Write parquet file to CephFS
> 
> 
> This treatment can take several hours depending on how much
> lines the CSV is and I wanted to identify if bz2 or network
> could be an issue
> so I run the following test (several time with consistent
> result) :
> I tried the following scenario with 20 cores and 2 core per
> task:
>   ■ Read the csv.bz2 from CephFS with connection with 1Gb/s 
> for
> each node: ~5 minutes.
>   ■ Read the csv.bz2 from TMPFS(setup to look like a shared
> storage space): ~5 minutes.
>   ■ From the 2 previous tests I concluded that uncompressing
> the file was part of the bottleneck so I decided to
> uncompress the file and store it in TMPFS as well, result:
> ~5.9 minutes.
> The test file has 25'833'369 lines and is 370MB compressed and
> 3700MB uncompressed. Those results have been reproduced 
> several
> time each.
> My question here is by what am I bottleneck in this case ?
> 
> I though that the uncompressed file in RAM would be the
> fastest. Is it possible that my program is suboptimal reading
> the CSV ?
> In the execution logs on the cluster I have 5 to 10 seconds GC
> time max, and timeline shows mainly CPU time (no shuffling, no
> randomization overload either).
> I also noticed that memory storage is never used during the
> execution. I know from several hours of research that bz2 is
> the only real compression algorithm usable as an input in 
> spark
> for parallelization reasons.
> 
> Do you have any idea of why such a behaviour ?
> and do you have any idea on how to improve such treatment ?
> 
> Cheers
> 
> Antoine
> 
> 
> 
> --
> Best Regards,
> Ayan Guha
> 
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Solved: Identify bottleneck

2019-12-20 Thread Antoine DUBOIS
Thank you very much for your help and your inputs. 
I understood some stuff but I finally understood my issue. 
In this case my main issue was a virtualization problem my vm was running on a 
small hypervysor, I split them on multiple hypervisor and the application now 
scale properly with the number of core and processing uncompressed data is 
indeed faster. 
My bottleneck seems to be the compression. 

Thank you all and have a merry chrismas 



De: "ayan guha"  
À: "Enrico Minack"  
Cc: "Antoine DUBOIS" , "Chris Teoh" 
, user@spark.apache.org 
Envoyé: Vendredi 20 Décembre 2019 09:39:49 
Objet: Re: Identify bottleneck 

Cool, thanks! Very helpful 

On Fri, 20 Dec 2019 at 6:53 pm, Enrico Minack < [ mailto:m...@enrico.minack.dev 
| m...@enrico.minack.dev ] > wrote: 



The issue is explained in depth here: [ 
https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015 
| 
https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015 
] 

Am 19.12.19 um 23:33 schrieb Chris Teoh: 

BQ_BEGIN

As far as I'm aware it isn't any better. The logic all gets processed by the 
same engine so to confirm, compare the DAGs generated from both approaches and 
see if they're identical. 

On Fri, 20 Dec 2019, 8:56 am ayan guha, < [ mailto:guha.a...@gmail.com | 
guha.a...@gmail.com ] > wrote: 

BQ_BEGIN

Quick question: Why is it better to use one sql vs multiple withColumn? isnt 
everything eventually rewritten by catalyst? 

On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack < [ mailto:m...@enrico.minack.dev 
| m...@enrico.minack.dev ] > wrote: 

BQ_BEGIN

How many withColumn statements do you have? Note that it is better to use a 
single select, rather than lots of withColumn. This also makes drops redundant. 

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really 
slow. Can you try this on a single machine, i.e. run wit "local[*]". 

Can you rule out the writing part by counting the rows? I presume this all 
happens in a single stage. 

Enrico 


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS: 

BQ_BEGIN

Hello 

I'm working on an ETL based on csv describing file systems to transform it into 
parquet so I can work on them easily to extract informations. 
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a 
lot of transformation and the framework helps organize the code. 
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of 
memory each. 
The storage is handle by a CephFS volume mounted on all nodes. 
First a small description of my algorithm (it's quiet simple): 


BQ_BEGIN

Use SparkContext to load the csv.bz2 file, 
Chain a lot of withColumn() statement, 
Drop all unnecessary columns, 
Write parquet file to CephFS 




This treatment can take several hours depending on how much lines the CSV is 
and I wanted to identify if bz2 or network could be an issue 
so I run the following test (several time with consistent result) : 
I tried the following scenario with 20 cores and 2 core per task: 


* Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 
minutes. 
* Read the csv.bz2 from TMPFS(setup to look like a shared storage space): 
~5 minutes. 
* From the 2 previous tests I concluded that uncompressing the file was 
part of the bottleneck so I decided to uncompress the file and store it in 
TMPFS as well, result: ~5.9 minutes. 
The test file has 25'833'369 lines and is 370MB compressed and 3700MB 
uncompressed. Those results have been reproduced several time each. 
My question here is by what am I bottleneck in this case ? 

I though that the uncompressed file in RAM would be the fastest. Is it possible 
that my program is suboptimal reading the CSV ? 
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and 
timeline shows mainly CPU time (no shuffling, no randomization overload 
either). 
I also noticed that memory storage is never used during the execution. I know 
from several hours of research that bz2 is the only real compression algorithm 
usable as an input in spark for parallelization reasons. 

Do you have any idea of why such a behaviour ? 
and do you have any idea on how to improve such treatment ? 

Cheers 

Antoine 

BQ_END




BQ_END

-- 
Best Regards, 
Ayan Guha 

BQ_END


BQ_END




BQ_END

-- 
Best Regards, 
Ayan Guha 



smime.p7s
Description: S/MIME Cryptographic Signature


Re: Identify bottleneck

2019-12-20 Thread ayan guha
Cool, thanks! Very helpful

On Fri, 20 Dec 2019 at 6:53 pm, Enrico Minack 
wrote:

> The issue is explained in depth here:
> https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015
>
> Am 19.12.19 um 23:33 schrieb Chris Teoh:
>
> As far as I'm aware it isn't any better. The logic all gets processed by
> the same engine so to confirm, compare the DAGs generated from both
> approaches and see if they're identical.
>
> On Fri, 20 Dec 2019, 8:56 am ayan guha,  wrote:
>
>> Quick question: Why is it better to use one sql vs multiple withColumn?
>> isnt everything eventually rewritten by catalyst?
>>
>> On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack 
>> wrote:
>>
>>> How many withColumn statements do you have? Note that it is better to
>>> use a single select, rather than lots of withColumn. This also makes drops
>>> redundant.
>>>
>>> Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is
>>> really slow. Can you try this on a single machine, i.e. run wit "local[*]".
>>>
>>> Can you rule out the writing part by counting the rows? I presume this
>>> all happens in a single stage.
>>>
>>> Enrico
>>>
>>>
>>> Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
>>>
>>> Hello
>>>
>>> I'm working on an ETL based on csv describing file systems to transform
>>> it into parquet so I can work on them easily to extract informations.
>>> I'm using Mr. Powers framework Daria to do so. I've quiet different
>>> input and a lot of transformation and the framework helps organize the
>>> code.
>>> I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and
>>> 32GB of memory each.
>>> The storage is handle by a CephFS volume mounted on all nodes.
>>> First a small description of my algorithm (it's quiet simple):
>>>
>>> Use SparkContext to load the csv.bz2 file,
>>> Chain a lot of withColumn() statement,
>>> Drop all unnecessary columns,
>>> Write parquet file to CephFS
>>>
>>> This treatment can take several hours depending on how much lines the
>>> CSV is and I wanted to identify if bz2 or network could be an issue
>>> so I run the following test (several time with consistent result) :
>>> I tried the following scenario with 20 cores and 2 core per task:
>>>
>>>- Read the csv.bz2 from CephFS with connection with 1Gb/s for each
>>>node: ~5 minutes.
>>>- Read the csv.bz2 from TMPFS(setup to look like a shared storage
>>>space): ~5 minutes.
>>>- From the 2 previous tests I concluded that uncompressing the file
>>>was part of the bottleneck so I decided to uncompress the file and store 
>>> it
>>>in TMPFS as well, result: ~5.9 minutes.
>>>
>>> The test file has 25'833'369 lines and is 370MB compressed and 3700MB
>>> uncompressed. Those results have been reproduced several time each.
>>> My question here is by what am I bottleneck in this case ?
>>>
>>> I though that the uncompressed file in RAM would be the fastest. Is it
>>> possible that my program is suboptimal reading the CSV ?
>>> In the execution logs on the cluster I have 5 to 10 seconds GC time max,
>>> and timeline shows mainly CPU time (no shuffling, no randomization overload
>>> either).
>>> I also noticed that memory storage is never used during the execution. I
>>> know from several hours of research that bz2 is the only real compression
>>> algorithm usable as an input in spark for parallelization reasons.
>>>
>>> Do you have any idea of why such a behaviour ?
>>> and do you have any idea on how to improve such treatment ?
>>>
>>> Cheers
>>>
>>> Antoine
>>>
>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
>
> --
Best Regards,
Ayan Guha


Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Ruijing Li
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline
using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
location as parquet then reads the files back in and writes to multiple
hadoop clusters (all co-located in the same datacenter).  It should be a
very simple task, but executors are being killed off exceeding container
thresholds. From logs, it is exceeding given memory (using Mesos as the
cluster manager).

The ETL process works perfectly fine with the given resources, doing joins
and adding columns. The output is written successfully the first time. *Only
when the pipeline at the end reads the output from HDFS and writes it to
different HDFS cluster paths does it fail.* (It does a
spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I
should start looking at.

-- 
Cheers,
Ruijing Li
-- 
Cheers,
Ruijing Li