It looks to me a little bit strange. First json.gz files are single threaded, 
ie each file can only be processed by one thread (so it is good to have many 
files of around 128 MB to 512 MB size each).

Then what you do in the code is already done by the data source. There is no 
need to read the file directory and parallelize. Just provide the directory 
containing the files to the data source and Spark automatically takes care to 
read them from different executors.

In order  improve write Performance check if you can store them in Avro (or 
parquet or orc) using their internal compression feature. Then you can have 
even many threads/file. 
If you need to store them as json.gz make sure that the files are between 128 
and 512 MB.

> On 28. Sep 2017, at 20:50, Jeroen Miller <bluedasya...@gmail.com> wrote:
> 
> More details on what I want to achieve. Maybe someone can suggest a
> course of action.
> 
> My processing is extremely simple: reading <file_i>.json.gz text
> files, filtering each line according a regex, and saving the surviving
> lines in a similarly named <result_i>.gz file.
> 
> Unfortunately changing the data format is impossible (we are dealing
> with terabytes here) so I need to process .gz files.
> 
> Ideally, I would like to process a large number of such files in
> parallel, that is using n_e executors which would each take care of a
> fraction 1/n_e of all the files. The trouble is that I don't know how
> to process files in parallel without loading them in the driver first,
> which would result in a major bottleneck.
> 
> Here was my naive approach in Scala-like pseudo-code:
> 
> //
> // This happens on the driver
> //
> val files = List("s3://bckt/file-1.json.gz", ..., "s3://bckt/file-N.json.gz")
> val files_rdd = sc.parallelize(files, num_partitions)
> //
> // Now files_rdd (which only holds file names) is distributed across
> several executors
> // and/or nodes
> //
> 
> files_rdd.foreach(
>    //
>    // It is my understanding that what is performed within the foreach method
>    // will be parallelized on several executors / nodes
>    //
>    file => {
>        //
>        // This would happen on a given executor: a given input file
> is processed
>        // entirely on a given executor
>        //
>        val lines = sc.read.text(file)
>        val filtered_lines = lines.filter( // filter based on regex // )
>        filtered_lines.write.option("compression",
> "gzip").text("a_filename_tbd")
>    }
> )
> 
> Unfortunately this is not possible since the Spark context sc is
> defined in the driver and cannot be shared.
> 
> My problem would be entirely solved if I could manage to read files
> not from the driver, but from a given executor.
> 
> Another possibility would be to read each .gz file in the driver
> (about 2GB each), materializing the whole resulting RDD on the driver
> (around 12GB) and then calling repartition on that RDD, but only the
> regex part would be parallelized, and the data shuffling will probably
> ruin the performance.
> 
> Any idea?
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to