Does each gzip file look like this:

{json1}
{json2}
{json3}

meaning that each line is a separate json object?

I proccess a similar large file batch and what I do is this:

input.txt # each line in input.txt represents a path to a gzip file each
containing a json object every line
my_rdd = sc.parallelize(input.txt) # creats a rdd with each file_path as a
row
my_rdd = my_rdd.flatmap(open_files) # opens the files and yields them line
by line
my_rdd = my_rdd.map(do_something_with_files) # now do something with each
line

the important part at least in python is the yield, because it makes the
function memory efficient. You could even go further and only yield a json
if it matches the regex criteria saving you the map(). Maybe yield a
(path,json) pair to later reconstruct which line goes into which file.
Reduce the rdd and write out the file.

If all files in input.txt are to big to be processed at once consider
dividing input.txt into smaller chunks and process each chunk individually.

On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta <gourav.sengu...@gmail.com
> wrote:

> I think that Vadim's response makes a lot of sense in terms of utilizing
> SPARK. Why are you not using JSON reader of SPARK? Your input has to follow
> a particular JSON style, but then it would be interesting to know whether
> you have looked into it at all.
>
> If you are going to read them only once then there is really no need to
> convert them and then read them.
>
> I will be really interested to hear in case you were able to using json
> reader natively available in SPARK.
>
>
> Regards,
> Gourav
>
> On Thu, Sep 28, 2017 at 8:16 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> Instead of having one job, you can try processing each file in a separate
>> job, but run multiple jobs in parallel within one SparkContext.
>> Something like this should work for you, it'll submit N jobs from the
>> driver, the jobs will run independently, but executors will dynamically
>> work on different jobs, so you'll utilize executors at full.
>>
>> ```
>> import org.apache.spark.sql.SparkSession
>>
>> import scala.collection.parallel.ForkJoinTaskSupport
>>
>> val spark: SparkSession
>> val files: Seq[String]
>> val filesParallelCollection = files.toParArray
>> val howManyFilesToProcessInParallel = math.min(50, files.length)
>>
>> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
>>   new scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcess
>> InParallel)
>> )
>> filesParallelCollection.foreach(file => {
>>   spark.read.text(file).filter(…)…
>> })
>> ```
>>
>> On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller <bluedasya...@gmail.com>
>> wrote:
>>
>>> More details on what I want to achieve. Maybe someone can suggest a
>>> course of action.
>>>
>>> My processing is extremely simple: reading <file_i>.json.gz text
>>> files, filtering each line according a regex, and saving the surviving
>>> lines in a similarly named <result_i>.gz file.
>>>
>>> Unfortunately changing the data format is impossible (we are dealing
>>> with terabytes here) so I need to process .gz files.
>>>
>>> Ideally, I would like to process a large number of such files in
>>> parallel, that is using n_e executors which would each take care of a
>>> fraction 1/n_e of all the files. The trouble is that I don't know how
>>> to process files in parallel without loading them in the driver first,
>>> which would result in a major bottleneck.
>>>
>>> Here was my naive approach in Scala-like pseudo-code:
>>>
>>> //
>>> // This happens on the driver
>>> //
>>> val files = List("s3://bckt/file-1.json.gz", ...,
>>> "s3://bckt/file-N.json.gz")
>>> val files_rdd = sc.parallelize(files, num_partitions)
>>> //
>>> // Now files_rdd (which only holds file names) is distributed across
>>> several executors
>>> // and/or nodes
>>> //
>>>
>>> files_rdd.foreach(
>>>     //
>>>     // It is my understanding that what is performed within the foreach
>>> method
>>>     // will be parallelized on several executors / nodes
>>>     //
>>>     file => {
>>>         //
>>>         // This would happen on a given executor: a given input file
>>> is processed
>>>         // entirely on a given executor
>>>         //
>>>         val lines = sc.read.text(file)
>>>         val filtered_lines = lines.filter( // filter based on regex // )
>>>         filtered_lines.write.option("compression",
>>> "gzip").text("a_filename_tbd")
>>>     }
>>> )
>>>
>>> Unfortunately this is not possible since the Spark context sc is
>>> defined in the driver and cannot be shared.
>>>
>>> My problem would be entirely solved if I could manage to read files
>>> not from the driver, but from a given executor.
>>>
>>> Another possibility would be to read each .gz file in the driver
>>> (about 2GB each), materializing the whole resulting RDD on the driver
>>> (around 12GB) and then calling repartition on that RDD, but only the
>>> regex part would be parallelized, and the data shuffling will probably
>>> ruin the performance.
>>>
>>> Any idea?
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>

Reply via email to