I think that the best option is to see whether data frames option of reading JSON files works or not.
On Fri, Sep 29, 2017 at 3:53 PM, Alexander Czech < alexander.cz...@googlemail.com> wrote: > Does each gzip file look like this: > > {json1} > {json2} > {json3} > > meaning that each line is a separate json object? > > I proccess a similar large file batch and what I do is this: > > input.txt # each line in input.txt represents a path to a gzip file each > containing a json object every line > my_rdd = sc.parallelize(input.txt) # creats a rdd with each file_path as > a row > my_rdd = my_rdd.flatmap(open_files) # opens the files and yields them line > by line > my_rdd = my_rdd.map(do_something_with_files) # now do something with each > line > > the important part at least in python is the yield, because it makes the > function memory efficient. You could even go further and only yield a json > if it matches the regex criteria saving you the map(). Maybe yield a > (path,json) pair to later reconstruct which line goes into which file. > Reduce the rdd and write out the file. > > If all files in input.txt are to big to be processed at once consider > dividing input.txt into smaller chunks and process each chunk individually. > > On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta < > gourav.sengu...@gmail.com> wrote: > >> I think that Vadim's response makes a lot of sense in terms of utilizing >> SPARK. Why are you not using JSON reader of SPARK? Your input has to follow >> a particular JSON style, but then it would be interesting to know whether >> you have looked into it at all. >> >> If you are going to read them only once then there is really no need to >> convert them and then read them. >> >> I will be really interested to hear in case you were able to using json >> reader natively available in SPARK. >> >> >> Regards, >> Gourav >> >> On Thu, Sep 28, 2017 at 8:16 PM, Vadim Semenov < >> vadim.seme...@datadoghq.com> wrote: >> >>> Instead of having one job, you can try processing each file in a >>> separate job, but run multiple jobs in parallel within one SparkContext. >>> Something like this should work for you, it'll submit N jobs from the >>> driver, the jobs will run independently, but executors will dynamically >>> work on different jobs, so you'll utilize executors at full. >>> >>> ``` >>> import org.apache.spark.sql.SparkSession >>> >>> import scala.collection.parallel.ForkJoinTaskSupport >>> >>> val spark: SparkSession >>> val files: Seq[String] >>> val filesParallelCollection = files.toParArray >>> val howManyFilesToProcessInParallel = math.min(50, files.length) >>> >>> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()( >>> new scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcess >>> InParallel) >>> ) >>> filesParallelCollection.foreach(file => { >>> spark.read.text(file).filter(…)… >>> }) >>> ``` >>> >>> On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller <bluedasya...@gmail.com> >>> wrote: >>> >>>> More details on what I want to achieve. Maybe someone can suggest a >>>> course of action. >>>> >>>> My processing is extremely simple: reading <file_i>.json.gz text >>>> files, filtering each line according a regex, and saving the surviving >>>> lines in a similarly named <result_i>.gz file. >>>> >>>> Unfortunately changing the data format is impossible (we are dealing >>>> with terabytes here) so I need to process .gz files. >>>> >>>> Ideally, I would like to process a large number of such files in >>>> parallel, that is using n_e executors which would each take care of a >>>> fraction 1/n_e of all the files. The trouble is that I don't know how >>>> to process files in parallel without loading them in the driver first, >>>> which would result in a major bottleneck. >>>> >>>> Here was my naive approach in Scala-like pseudo-code: >>>> >>>> // >>>> // This happens on the driver >>>> // >>>> val files = List("s3://bckt/file-1.json.gz", ..., >>>> "s3://bckt/file-N.json.gz") >>>> val files_rdd = sc.parallelize(files, num_partitions) >>>> // >>>> // Now files_rdd (which only holds file names) is distributed across >>>> several executors >>>> // and/or nodes >>>> // >>>> >>>> files_rdd.foreach( >>>> // >>>> // It is my understanding that what is performed within the foreach >>>> method >>>> // will be parallelized on several executors / nodes >>>> // >>>> file => { >>>> // >>>> // This would happen on a given executor: a given input file >>>> is processed >>>> // entirely on a given executor >>>> // >>>> val lines = sc.read.text(file) >>>> val filtered_lines = lines.filter( // filter based on regex // ) >>>> filtered_lines.write.option("compression", >>>> "gzip").text("a_filename_tbd") >>>> } >>>> ) >>>> >>>> Unfortunately this is not possible since the Spark context sc is >>>> defined in the driver and cannot be shared. >>>> >>>> My problem would be entirely solved if I could manage to read files >>>> not from the driver, but from a given executor. >>>> >>>> Another possibility would be to read each .gz file in the driver >>>> (about 2GB each), materializing the whole resulting RDD on the driver >>>> (around 12GB) and then calling repartition on that RDD, but only the >>>> regex part would be parallelized, and the data shuffling will probably >>>> ruin the performance. >>>> >>>> Any idea? >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> >>>> >>> >> >