Hi all,

We're using Spark 1.3.0 via a small YARN cluster to do some log processing.
The jobs are pretty simple, for a number of customers and a number of days,
fetch some event log data, build aggregates and store those aggregates into
a data store.

The way our script is written right now does something akin to:

with SparkContext() as sc:
    for customer in customers:
        for day in days:
            logs = sc.textFile(get_logs(customer, day))
            aggregate = make_aggregate(logs)
            # This function contains the action saveAsNewAPIHadoopFile which
            # triggers a save

So we have a Spark job per customer, per day.

I tried doing some parallel job submission with something similar to:

def make_and_save_aggregate(customer, day, spark_context):
    # Without a separate threading.Lock() here or better yet, one guarding the
    # Spark context, multiple customer/day transformations and actions could
    # be interweaved
    sc = spark_context
    logs = sc.textFile(get_logs(customer, day))
    aggregate = make_aggregate(logs)
with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor:
    for customer in customers:
        for day in days:
            executor.submit(make_and_save_aggregate, customer, day, sc)

The problem is, with no locks on a SparkContext except during initialization
operations on the context could (if I understand correctly) be interweaved
leading to DAG which contains transformations out of order and from
different customer, day periods.

One solution is instead to launch multiple Spark jobs via spark-submit and
let YARN/Spark's dynamic executor allocation take care of fair scheduling.
In practice, this doesn't seem to yield very fast computation perhaps due
to some additional overhead with YARN.

Is there any safe way to launch concurrent jobs like this using a single
PySpark context?

