Vadim's "scheduling within an application" approach turned out to be
excellent, at least on a single node with the CPU usage reaching about
90%. I directly implemented the code template that Vadim kindly
provided:

    parallel_collection_paths.foreach(
        path => {
            val lines = spark.read.textFile(path)
            val pattern = "my_simple_regex".r
            val filtered_lines = lines.filter(
                line => {
                    val matched = pattern.findFirstMatchIn(line)
                    matched match {
                        case Some(m) => true
                        case None => false
                    }
                }
            )
            val output_dir = get_output_dir(path)
            filtered_lines.write.option("compression", "gzip").text(output_dir)
        }
    )

For ForkJoinPool(parallelism), I simply used a parallelism value
equals to the number of executors. Each input JSON file was processed
on a single partition, and a unique output part-xxxx file was
generated for each input JSON file.

For some reason that I still have to investigate, this does not scale
as well when using multiple instances, though the performances are
still acceptable. Any idea why?

Another approach was to simply repartition the DataSet before filtering:

    val lines = spark.read.textFile(path).repartition(n)
    val filtered_lines = lines.filter(...)

That helped a lot (at least when running on a single node) but not as
much as Vadim's approach.

A minor remark. I'm filtering the JSON lines with a simple regex:
However, I had to declare the regex in the parallel collection's
foreach() otherwise the Spark task would fail in the Spark shell with:

    org.apache.spark.SparkException: Task not serializable

Why can't the scala.util.matching.Regex be serialized?

Jeroen

On Thu, Sep 28, 2017 at 9:16 PM, Vadim Semenov
<vadim.seme...@datadoghq.com> wrote:
> Instead of having one job, you can try processing each
> file in a separate job, but run multiple jobs in parallel
> within one SparkContext. Something like this should work for
> you, it'll submit N jobs from the driver, the jobs will run
> independently, but executors will dynamically work on
> different jobs, so you'll utilize executors at full.
>
> ```
> import org.apache.spark.sql.SparkSession
> import scala.collection.parallel.ForkJoinTaskSupport
>
> val spark: SparkSession
> val files: Seq[String]
> val filesParallelCollection = files.toParArray
> val howManyFilesToProcessInParallel = math.min(50, files.length)
>
> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
>   new
> scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcessInParallel)
> )
> filesParallelCollection.foreach(file => {
>   spark.read.text(file).filter(…)…
> })
> ```

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to