> It turns out my assumption on load and unionAll being blocking is not
> correct. They are transformations. So instead of just running only the load
> and unionAll in the run() methods, I think you will have to save the
> intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS
> like http://tachyon-project.org/) in the run() methods. The second for
> loop will also have to load from the intermediate parquet files. Then
> finally save the final dfInput[0] to the HDFS.
> I think this way of parallelizing will force the cluster to utilize the
> all the resources.
> -Kiran
>> James,
>> As I can see, there are three distinct parts to your program:
>>    - for loop
>>    - synchronized block
>>    - final outputFrame.save statement
>> Can you do a separate timing measurement by putting a simple
>> System.currentTimeMillis() around these blocks to know how much they are
>> taking and then try to optimize where it takes longest? In the second
>> block, you may want to measure the time for the two statements. Improving
>> this boils down to playing with spark settings.
>> Now consider the first block: I think this is a classic case of merge
>> sort or a reduce tree. You already tried to improve this by submitting jobs
>> in parallel using executor pool/Callable etc.
>> To further improve the parallelization, I suggest you use a reduce tree
>> like approach. For example, lets say you want to compute sum of all
>> elements of an array in parallel. The way its solved for a GPU like
>> platform is you divide your input array initially in chunks of 2, compute
>> those n/2 sums parallely on separate threads and save the result in the
>> first of the two elements. In the next iteration, you compute n/4 sums
>> parallely of the earlier sums and so on till you are left with only two
>> elements whose sum gives you final sum.
>> You are performing many sequential unionAll operations for inputs.size()
>> avro files. Assuming the unionAll() on DataFrame is blocking (and not a
>> simple transformation like on RDDs) and actually performs the union
>> operation, you will certainly benefit by parallelizing this loop. You may
>> change the loop to something like below:
>> // pseudo code only
>> int n = inputs.size()
>> // initialize executor
>> executor = new FixedThreadPoolExecutor(n/2)
>> dfInput = new DataFrame[n/2]
>> for(int i =0;i < n/2;i++) {
>>     executor.submit(new Runnable() {
>>         public void run() {
>>             // union of i and i+n/2
>>             // showing [] only to bring out array access. Replace with
>> dfInput(i) in your code
>>             dfInput[i] = sqlContext.load(inputPaths.get(i),
>> "com.databricks.spark.avro").unionAll(sqlContext.load(inputsPath.get(i +
>> n/2), "com.databricks.spark.avro"))
>>         }
>>     });
>> }
>> executor.awaitTermination(0, TimeUnit.SECONDS)
>> int steps = log(n)/log(2.0)
>> for(s = 2; s < steps;s++) {
>>     int stride = n/(1 << s); // n/(2^s)
>>     for(int i = 0;i < stride;i++) {
>>         executor.submit(new Runnable() {
>>             public void run() {
>>                 // union of i and i+n/2
>>                 // showing [] only to bring out array access. Replace
>> with dfInput(i) and dfInput(i+stride) in your code
>>                 dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
>>             }
>>         });
>>     }
>>     executor.awaitTermination(0, TimeUnit.SECONDS)
>> }
>> Let me know if it helped.
>> -Kiran
>>> Thanks for the confirmation! We're quite new to Spark, so a little
>>> reassurance is a good thing to have sometimes :-)
>>> The thing that's concerning me at the moment is that my job doesn't seem
>>> to run any faster with more compute resources added to the cluster, and
>>> this is proving a little tricky to debug. There are a lot of variables, so
>>> here's what we've tried already and the apparent impact. If anyone has any
>>> further suggestions, we'd love to hear!
>>> * Increase the "minimum" number of output files (targetPartitions
>>> above), so that input groups smaller than our minimum chunk size can still
>>> be worked on by more than one executor. This does measurably speed things
>>> up, but obviously it's a trade-off, as the original goal for this job is to
>>> merge our data into fewer, larger files.
>>> * Submit many jobs in parallel, by running the above code in a Callable,
>>> on an executor pool. This seems to help, to some extent, but I'm not sure
>>> what else needs to be configured alongside it -- driver threads, scheduling
>>> policy, etc. We set scheduling to "FAIR" when doing this, as that seemed
>>> like the right approach, but we're not 100% confident. It seemed to help
>>> quite substantially anyway, so perhaps this just needs further tuning?
>>> * Increasing executors, RAM, etc. This doesn't make a difference by
>>> itself for this job, so I'm thinking we're already not fully utilising the
>>> resources we have in a smaller cluster.
>>> Again, any recommendations appreciated. Thanks for the help!
>>> James.
>>>> Hi
>>>>> Hi,
>>>>> We have a load of Avro data coming into our data systems in the form
>>>>> of relatively small files, which we're merging into larger Parquet files
>>>>> with Spark. I've been following the docs and the approach I'm taking 
>>>>> seemed
>>>>> fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
>>>>> not the most optimal approach.
>>>>> I was wondering if anyone on this list might have some advice to make
>>>>> to make this job as efficient as possible. Here's some code:
>>>>> DataFrame dfInput = sqlContext.load(inputPaths.get(0),
>>>>> "com.databricks.spark.avro");
>>>>> long totalSize = getDirSize(inputPaths.get(0));
>>>>> for (int i = 1; i < inputs.size(); ++i) {
>>>>>     dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
>>>>> "com.databricks.spark.avro"));
>>>>>     totalSize += getDirSize(inputPaths.get(i));
>>>>> }
>>>>> int targetPartitions = (int) Math.max(2L, totalSize /
>>>>> DataFrame outputFrame;
>>>>> // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
>>>>> // the synchronize block below. Suggestions welcome here too! :-)
>>>>> synchronized (this) {
>>>>>     RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions,
>>>>> false, null);
>>>>>     outputFrame = sqlContext.createDataFrame(inputRdd,
>>>>> dfInput.schema());
>>>>> }
>>>>> outputFrame.save(outputPath, "parquet", SaveMode.Overwrite);
>>>>> Here are some things bothering me:
>>>>>    - Conversion to an RDD and back again so that we can use
>>>>>    coalesce() to reduce the number of partitions. This is because we read 
>>>>> that
>>>>>    repartition() is not as efficient as coalesce(), and local micro 
>>>>> benchmarks
>>>>>    seemed to somewhat confirm that this was faster. Is this really a good 
>>>>> idea
>>>>>    though? Should we be doing something else?
>>>>> Repartition uses coalesce but with a forced shuffle step. Its just a
>>>> shortcut for coalesce(xxx, true)
>>>> Doing a coalesce sounds correct, I'd do the same :) Note that if you
>>>> add the shuffle step, then your partitions should be better balanced.
>>>>>    - Usage of unionAll() - this is the only way I could find to join
>>>>>    the separate data sets into a single data frame to save as Parquet. Is
>>>>>    there a better way?
>>>>> When using directly the inputformats you can do this
>>>> FileInputFormat.addInputPath, it should perform at least as good as union.
>>>>>    - Do I need to be using the DataFrame API at all? I'm not querying
>>>>>    any data here, so the nice API for SQL-like transformations of the data
>>>>>    isn't being used. The DataFrame API just seemed like the path of least
>>>>>    resistance for working with Avro and Parquet. Would there be any 
>>>>> advantage
>>>>>    to using hadoopRDD() with the appropriate Input/Output formats?
>>>> Using directly the input/outputformats sounds viable. But the snippet
>>>> you show seems clean enough and I am not sure there would be much value in
>>>> making something (maybe) slightly faster but harder to understand.
>>>> Eugen
>>>> Any advice or tips greatly appreciated!
>>>>> James.

