Instead of having one job, you can try processing each file in a separate job, but run multiple jobs in parallel within one SparkContext. Something like this should work for you, it'll submit N jobs from the driver, the jobs will run independently, but executors will dynamically work on different jobs, so you'll utilize executors at full.
``` import org.apache.spark.sql.SparkSession import scala.collection.parallel.ForkJoinTaskSupport val spark: SparkSession val files: Seq[String] val filesParallelCollection = files.toParArray val howManyFilesToProcessInParallel = math.min(50, files.length) filesParallelCollection.tasksupport = new ForkJoinTaskSupport()( new scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcessInParallel) ) filesParallelCollection.foreach(file => { spark.read.text(file).filter(…)… }) ``` On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller <bluedasya...@gmail.com> wrote: > More details on what I want to achieve. Maybe someone can suggest a > course of action. > > My processing is extremely simple: reading <file_i>.json.gz text > files, filtering each line according a regex, and saving the surviving > lines in a similarly named <result_i>.gz file. > > Unfortunately changing the data format is impossible (we are dealing > with terabytes here) so I need to process .gz files. > > Ideally, I would like to process a large number of such files in > parallel, that is using n_e executors which would each take care of a > fraction 1/n_e of all the files. The trouble is that I don't know how > to process files in parallel without loading them in the driver first, > which would result in a major bottleneck. > > Here was my naive approach in Scala-like pseudo-code: > > // > // This happens on the driver > // > val files = List("s3://bckt/file-1.json.gz", ..., > "s3://bckt/file-N.json.gz") > val files_rdd = sc.parallelize(files, num_partitions) > // > // Now files_rdd (which only holds file names) is distributed across > several executors > // and/or nodes > // > > files_rdd.foreach( > // > // It is my understanding that what is performed within the foreach > method > // will be parallelized on several executors / nodes > // > file => { > // > // This would happen on a given executor: a given input file > is processed > // entirely on a given executor > // > val lines = sc.read.text(file) > val filtered_lines = lines.filter( // filter based on regex // ) > filtered_lines.write.option("compression", > "gzip").text("a_filename_tbd") > } > ) > > Unfortunately this is not possible since the Spark context sc is > defined in the driver and cannot be shared. > > My problem would be entirely solved if I could manage to read files > not from the driver, but from a given executor. > > Another possibility would be to read each .gz file in the driver > (about 2GB each), materializing the whole resulting RDD on the driver > (around 12GB) and then calling repartition on that RDD, but only the > regex part would be parallelized, and the data shuffling will probably > ruin the performance. > > Any idea? > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >