apparently the "withColumn" issue only apply for hundred or thousand of
calls. This was not the case here (twenty calls)

On Fri, Dec 20, 2019 at 08:53:16AM +0100, Enrico Minack wrote:
> The issue is explained in depth here: https://medium.com/@manuzhang/
> the-hidden-cost-of-spark-withcolumn-8ffea517c015
> 
> Am 19.12.19 um 23:33 schrieb Chris Teoh:
> 
>     As far as I'm aware it isn't any better. The logic all gets processed by
>     the same engine so to confirm, compare the DAGs generated from both
>     approaches and see if they're identical.
> 
>     On Fri, 20 Dec 2019, 8:56 am ayan guha, <guha.a...@gmail.com> wrote:
> 
>         Quick question: Why is it better to use one sql vs multiple 
> withColumn?
>         isnt everything eventually rewritten by catalyst?
> 
>         On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <m...@enrico.minack.dev>
>         wrote:
> 
>             How many withColumn statements do you have? Note that it is better
>             to use a single select, rather than lots of withColumn. This also
>             makes drops redundant.
> 
>             Reading 25m CSV lines and writing to Parquet in 5 minutes on 32
>             cores is really slow. Can you try this on a single machine, i.e.
>             run wit "local[*]".
> 
>             Can you rule out the writing part by counting the rows? I presume
>             this all happens in a single stage.
> 
>             Enrico
> 
> 
>             Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
> 
>                 Hello
> 
>                 I'm working on an ETL based on csv describing file systems to
>                 transform it into parquet so I can work on them easily to
>                 extract informations.
>                 I'm using Mr. Powers framework Daria to do so. I've quiet
>                 different input and a lot of transformation and the framework
>                 helps organize the code.
>                 I have a stand-alone cluster v2.3.2 composed of 4 node with 8
>                 cores and 32GB of memory each.
>                 The storage is handle by a CephFS volume mounted on all nodes.
>                 First a small description of my algorithm (it's quiet simple):
> 
> 
>                     Use SparkContext to load the csv.bz2 file,
>                     Chain a lot of withColumn() statement,
>                     Drop all unnecessary columns,
>                     Write parquet file to CephFS
> 
> 
>                 This treatment can take several hours depending on how much
>                 lines the CSV is and I wanted to identify if bz2 or network
>                 could be an issue
>                 so I run the following test (several time with consistent
>                 result) :
>                 I tried the following scenario with 20 cores and 2 core per
>                 task:
>                   ■ Read the csv.bz2 from CephFS with connection with 1Gb/s 
> for
>                     each node: ~5 minutes.
>                   ■ Read the csv.bz2 from TMPFS(setup to look like a shared
>                     storage space): ~5 minutes.
>                   ■ From the 2 previous tests I concluded that uncompressing
>                     the file was part of the bottleneck so I decided to
>                     uncompress the file and store it in TMPFS as well, result:
>                     ~5.9 minutes.
>                 The test file has 25'833'369 lines and is 370MB compressed and
>                 3700MB uncompressed. Those results have been reproduced 
> several
>                 time each.
>                 My question here is by what am I bottleneck in this case ?
> 
>                 I though that the uncompressed file in RAM would be the
>                 fastest. Is it possible that my program is suboptimal reading
>                 the CSV ?
>                 In the execution logs on the cluster I have 5 to 10 seconds GC
>                 time max, and timeline shows mainly CPU time (no shuffling, no
>                 randomization overload either).
>                 I also noticed that memory storage is never used during the
>                 execution. I know from several hours of research that bz2 is
>                 the only real compression algorithm usable as an input in 
> spark
>                 for parallelization reasons.
> 
>                 Do you have any idea of why such a behaviour ?
>                 and do you have any idea on how to improve such treatment ?
> 
>                 Cheers
> 
>                 Antoine
> 
> 
> 
>         --
>         Best Regards,
>         Ayan Guha
> 
> 

-- 
nicolas

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to