Instead of having one job, you can try processing each file in a separate
job, but run multiple jobs in parallel within one SparkContext.
Something like this should work for you, it'll submit N jobs from the
driver, the jobs will run independently, but executors will dynamically
work on different jobs, so you'll utilize executors at full.

```
import org.apache.spark.sql.SparkSession

import scala.collection.parallel.ForkJoinTaskSupport

val spark: SparkSession
val files: Seq[String]
val filesParallelCollection = files.toParArray
val howManyFilesToProcessInParallel = math.min(50, files.length)

filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
  new
scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcessInParallel)
)
filesParallelCollection.foreach(file => {
  spark.read.text(file).filter(…)…
})
```

On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller <bluedasya...@gmail.com>
wrote:

> More details on what I want to achieve. Maybe someone can suggest a
> course of action.
>
> My processing is extremely simple: reading <file_i>.json.gz text
> files, filtering each line according a regex, and saving the surviving
> lines in a similarly named <result_i>.gz file.
>
> Unfortunately changing the data format is impossible (we are dealing
> with terabytes here) so I need to process .gz files.
>
> Ideally, I would like to process a large number of such files in
> parallel, that is using n_e executors which would each take care of a
> fraction 1/n_e of all the files. The trouble is that I don't know how
> to process files in parallel without loading them in the driver first,
> which would result in a major bottleneck.
>
> Here was my naive approach in Scala-like pseudo-code:
>
> //
> // This happens on the driver
> //
> val files = List("s3://bckt/file-1.json.gz", ...,
> "s3://bckt/file-N.json.gz")
> val files_rdd = sc.parallelize(files, num_partitions)
> //
> // Now files_rdd (which only holds file names) is distributed across
> several executors
> // and/or nodes
> //
>
> files_rdd.foreach(
>     //
>     // It is my understanding that what is performed within the foreach
> method
>     // will be parallelized on several executors / nodes
>     //
>     file => {
>         //
>         // This would happen on a given executor: a given input file
> is processed
>         // entirely on a given executor
>         //
>         val lines = sc.read.text(file)
>         val filtered_lines = lines.filter( // filter based on regex // )
>         filtered_lines.write.option("compression",
> "gzip").text("a_filename_tbd")
>     }
> )
>
> Unfortunately this is not possible since the Spark context sc is
> defined in the driver and cannot be shared.
>
> My problem would be entirely solved if I could manage to read files
> not from the driver, but from a given executor.
>
> Another possibility would be to read each .gz file in the driver
> (about 2GB each), materializing the whole resulting RDD on the driver
> (around 12GB) and then calling repartition on that RDD, but only the
> regex part would be parallelized, and the data shuffling will probably
> ruin the performance.
>
> Any idea?
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to