I think that the best option is to see whether data frames option of
reading JSON files works or not.



On Fri, Sep 29, 2017 at 3:53 PM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> Does each gzip file look like this:
>
> {json1}
> {json2}
> {json3}
>
> meaning that each line is a separate json object?
>
> I proccess a similar large file batch and what I do is this:
>
> input.txt # each line in input.txt represents a path to a gzip file each
> containing a json object every line
> my_rdd = sc.parallelize(input.txt) # creats a rdd with each file_path as
> a row
> my_rdd = my_rdd.flatmap(open_files) # opens the files and yields them line
> by line
> my_rdd = my_rdd.map(do_something_with_files) # now do something with each
> line
>
> the important part at least in python is the yield, because it makes the
> function memory efficient. You could even go further and only yield a json
> if it matches the regex criteria saving you the map(). Maybe yield a
> (path,json) pair to later reconstruct which line goes into which file.
> Reduce the rdd and write out the file.
>
> If all files in input.txt are to big to be processed at once consider
> dividing input.txt into smaller chunks and process each chunk individually.
>
> On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> I think that Vadim's response makes a lot of sense in terms of utilizing
>> SPARK. Why are you not using JSON reader of SPARK? Your input has to follow
>> a particular JSON style, but then it would be interesting to know whether
>> you have looked into it at all.
>>
>> If you are going to read them only once then there is really no need to
>> convert them and then read them.
>>
>> I will be really interested to hear in case you were able to using json
>> reader natively available in SPARK.
>>
>>
>> Regards,
>> Gourav
>>
>> On Thu, Sep 28, 2017 at 8:16 PM, Vadim Semenov <
>> vadim.seme...@datadoghq.com> wrote:
>>
>>> Instead of having one job, you can try processing each file in a
>>> separate job, but run multiple jobs in parallel within one SparkContext.
>>> Something like this should work for you, it'll submit N jobs from the
>>> driver, the jobs will run independently, but executors will dynamically
>>> work on different jobs, so you'll utilize executors at full.
>>>
>>> ```
>>> import org.apache.spark.sql.SparkSession
>>>
>>> import scala.collection.parallel.ForkJoinTaskSupport
>>>
>>> val spark: SparkSession
>>> val files: Seq[String]
>>> val filesParallelCollection = files.toParArray
>>> val howManyFilesToProcessInParallel = math.min(50, files.length)
>>>
>>> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
>>>   new scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcess
>>> InParallel)
>>> )
>>> filesParallelCollection.foreach(file => {
>>>   spark.read.text(file).filter(…)…
>>> })
>>> ```
>>>
>>> On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller <bluedasya...@gmail.com>
>>> wrote:
>>>
>>>> More details on what I want to achieve. Maybe someone can suggest a
>>>> course of action.
>>>>
>>>> My processing is extremely simple: reading <file_i>.json.gz text
>>>> files, filtering each line according a regex, and saving the surviving
>>>> lines in a similarly named <result_i>.gz file.
>>>>
>>>> Unfortunately changing the data format is impossible (we are dealing
>>>> with terabytes here) so I need to process .gz files.
>>>>
>>>> Ideally, I would like to process a large number of such files in
>>>> parallel, that is using n_e executors which would each take care of a
>>>> fraction 1/n_e of all the files. The trouble is that I don't know how
>>>> to process files in parallel without loading them in the driver first,
>>>> which would result in a major bottleneck.
>>>>
>>>> Here was my naive approach in Scala-like pseudo-code:
>>>>
>>>> //
>>>> // This happens on the driver
>>>> //
>>>> val files = List("s3://bckt/file-1.json.gz", ...,
>>>> "s3://bckt/file-N.json.gz")
>>>> val files_rdd = sc.parallelize(files, num_partitions)
>>>> //
>>>> // Now files_rdd (which only holds file names) is distributed across
>>>> several executors
>>>> // and/or nodes
>>>> //
>>>>
>>>> files_rdd.foreach(
>>>>     //
>>>>     // It is my understanding that what is performed within the foreach
>>>> method
>>>>     // will be parallelized on several executors / nodes
>>>>     //
>>>>     file => {
>>>>         //
>>>>         // This would happen on a given executor: a given input file
>>>> is processed
>>>>         // entirely on a given executor
>>>>         //
>>>>         val lines = sc.read.text(file)
>>>>         val filtered_lines = lines.filter( // filter based on regex // )
>>>>         filtered_lines.write.option("compression",
>>>> "gzip").text("a_filename_tbd")
>>>>     }
>>>> )
>>>>
>>>> Unfortunately this is not possible since the Spark context sc is
>>>> defined in the driver and cannot be shared.
>>>>
>>>> My problem would be entirely solved if I could manage to read files
>>>> not from the driver, but from a given executor.
>>>>
>>>> Another possibility would be to read each .gz file in the driver
>>>> (about 2GB each), materializing the whole resulting RDD on the driver
>>>> (around 12GB) and then calling repartition on that RDD, but only the
>>>> regex part would be parallelized, and the data shuffling will probably
>>>> ruin the performance.
>>>>
>>>> Any idea?
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>

Reply via email to