apparently the "withColumn" issue only apply for hundred or thousand of calls. This was not the case here (twenty calls)
On Fri, Dec 20, 2019 at 08:53:16AM +0100, Enrico Minack wrote: > The issue is explained in depth here: https://medium.com/@manuzhang/ > the-hidden-cost-of-spark-withcolumn-8ffea517c015 > > Am 19.12.19 um 23:33 schrieb Chris Teoh: > > As far as I'm aware it isn't any better. The logic all gets processed by > the same engine so to confirm, compare the DAGs generated from both > approaches and see if they're identical. > > On Fri, 20 Dec 2019, 8:56 am ayan guha, <guha.a...@gmail.com> wrote: > > Quick question: Why is it better to use one sql vs multiple > withColumn? > isnt everything eventually rewritten by catalyst? > > On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <m...@enrico.minack.dev> > wrote: > > How many withColumn statements do you have? Note that it is better > to use a single select, rather than lots of withColumn. This also > makes drops redundant. > > Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 > cores is really slow. Can you try this on a single machine, i.e. > run wit "local[*]". > > Can you rule out the writing part by counting the rows? I presume > this all happens in a single stage. > > Enrico > > > Am 18.12.19 um 10:56 schrieb Antoine DUBOIS: > > Hello > > I'm working on an ETL based on csv describing file systems to > transform it into parquet so I can work on them easily to > extract informations. > I'm using Mr. Powers framework Daria to do so. I've quiet > different input and a lot of transformation and the framework > helps organize the code. > I have a stand-alone cluster v2.3.2 composed of 4 node with 8 > cores and 32GB of memory each. > The storage is handle by a CephFS volume mounted on all nodes. > First a small description of my algorithm (it's quiet simple): > > > Use SparkContext to load the csv.bz2 file, > Chain a lot of withColumn() statement, > Drop all unnecessary columns, > Write parquet file to CephFS > > > This treatment can take several hours depending on how much > lines the CSV is and I wanted to identify if bz2 or network > could be an issue > so I run the following test (several time with consistent > result) : > I tried the following scenario with 20 cores and 2 core per > task: > ■ Read the csv.bz2 from CephFS with connection with 1Gb/s > for > each node: ~5 minutes. > ■ Read the csv.bz2 from TMPFS(setup to look like a shared > storage space): ~5 minutes. > ■ From the 2 previous tests I concluded that uncompressing > the file was part of the bottleneck so I decided to > uncompress the file and store it in TMPFS as well, result: > ~5.9 minutes. > The test file has 25'833'369 lines and is 370MB compressed and > 3700MB uncompressed. Those results have been reproduced > several > time each. > My question here is by what am I bottleneck in this case ? > > I though that the uncompressed file in RAM would be the > fastest. Is it possible that my program is suboptimal reading > the CSV ? > In the execution logs on the cluster I have 5 to 10 seconds GC > time max, and timeline shows mainly CPU time (no shuffling, no > randomization overload either). > I also noticed that memory storage is never used during the > execution. I know from several hours of research that bz2 is > the only real compression algorithm usable as an input in > spark > for parallelization reasons. > > Do you have any idea of why such a behaviour ? > and do you have any idea on how to improve such treatment ? > > Cheers > > Antoine > > > > -- > Best Regards, > Ayan Guha > > -- nicolas --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org