It turns out my assumption on load and unionAll being blocking is not
correct. They are transformations. So instead of just running only the load
and unionAll in the run() methods, I think you will have to save the
intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS
like http://tachyon-project.org/) in the run() methods. The second for loop
will also have to load from the intermediate parquet files. Then finally
save the final dfInput[0] to the HDFS.

I think this way of parallelizing will force the cluster to utilize the all
the resources.

-Kiran

On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar <loni...@gmail.com> wrote:

> James,
>
> As I can see, there are three distinct parts to your program:
>
>    - for loop
>    - synchronized block
>    - final outputFrame.save statement
>
> Can you do a separate timing measurement by putting a simple
> System.currentTimeMillis() around these blocks to know how much they are
> taking and then try to optimize where it takes longest? In the second
> block, you may want to measure the time for the two statements. Improving
> this boils down to playing with spark settings.
>
> Now consider the first block: I think this is a classic case of merge sort
> or a reduce tree. You already tried to improve this by submitting jobs in
> parallel using executor pool/Callable etc.
>
> To further improve the parallelization, I suggest you use a reduce tree
> like approach. For example, lets say you want to compute sum of all
> elements of an array in parallel. The way its solved for a GPU like
> platform is you divide your input array initially in chunks of 2, compute
> those n/2 sums parallely on separate threads and save the result in the
> first of the two elements. In the next iteration, you compute n/4 sums
> parallely of the earlier sums and so on till you are left with only two
> elements whose sum gives you final sum.
>
> You are performing many sequential unionAll operations for inputs.size()
> avro files. Assuming the unionAll() on DataFrame is blocking (and not a
> simple transformation like on RDDs) and actually performs the union
> operation, you will certainly benefit by parallelizing this loop. You may
> change the loop to something like below:
>
> // pseudo code only
> int n = inputs.size()
> // initialize executor
> executor = new FixedThreadPoolExecutor(n/2)
> dfInput = new DataFrame[n/2]
> for(int i =0;i < n/2;i++) {
>     executor.submit(new Runnable() {
>         public void run() {
>             // union of i and i+n/2
>             // showing [] only to bring out array access. Replace with
> dfInput(i) in your code
>             dfInput[i] = sqlContext.load(inputPaths.get(i),
> "com.databricks.spark.avro").unionAll(sqlContext.load(inputsPath.get(i +
> n/2), "com.databricks.spark.avro"))
>         }
>     });
> }
>
> executor.awaitTermination(0, TimeUnit.SECONDS)
>
> int steps = log(n)/log(2.0)
> for(s = 2; s < steps;s++) {
>     int stride = n/(1 << s); // n/(2^s)
>     for(int i = 0;i < stride;i++) {
>         executor.submit(new Runnable() {
>             public void run() {
>                 // union of i and i+n/2
>                 // showing [] only to bring out array access. Replace with
> dfInput(i) and dfInput(i+stride) in your code
>                 dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
>             }
>         });
>     }
>     executor.awaitTermination(0, TimeUnit.SECONDS)
> }
>
> Let me know if it helped.
>
> -Kiran
>
>
> On Thu, Jun 4, 2015 at 8:57 PM, James Aley <james.a...@swiftkey.com>
> wrote:
>
>> Thanks for the confirmation! We're quite new to Spark, so a little
>> reassurance is a good thing to have sometimes :-)
>>
>> The thing that's concerning me at the moment is that my job doesn't seem
>> to run any faster with more compute resources added to the cluster, and
>> this is proving a little tricky to debug. There are a lot of variables, so
>> here's what we've tried already and the apparent impact. If anyone has any
>> further suggestions, we'd love to hear!
>>
>> * Increase the "minimum" number of output files (targetPartitions above),
>> so that input groups smaller than our minimum chunk size can still be
>> worked on by more than one executor. This does measurably speed things up,
>> but obviously it's a trade-off, as the original goal for this job is to
>> merge our data into fewer, larger files.
>>
>> * Submit many jobs in parallel, by running the above code in a Callable,
>> on an executor pool. This seems to help, to some extent, but I'm not sure
>> what else needs to be configured alongside it -- driver threads, scheduling
>> policy, etc. We set scheduling to "FAIR" when doing this, as that seemed
>> like the right approach, but we're not 100% confident. It seemed to help
>> quite substantially anyway, so perhaps this just needs further tuning?
>>
>> * Increasing executors, RAM, etc. This doesn't make a difference by
>> itself for this job, so I'm thinking we're already not fully utilising the
>> resources we have in a smaller cluster.
>>
>> Again, any recommendations appreciated. Thanks for the help!
>>
>>
>> James.
>>
>> On 4 June 2015 at 15:00, Eugen Cepoi <cepoi.eu...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> 2015-06-04 15:29 GMT+02:00 James Aley <james.a...@swiftkey.com>:
>>>
>>>> Hi,
>>>>
>>>> We have a load of Avro data coming into our data systems in the form of
>>>> relatively small files, which we're merging into larger Parquet files with
>>>> Spark. I've been following the docs and the approach I'm taking seemed
>>>> fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
>>>> not the most optimal approach.
>>>>
>>>> I was wondering if anyone on this list might have some advice to make
>>>> to make this job as efficient as possible. Here's some code:
>>>>
>>>> DataFrame dfInput = sqlContext.load(inputPaths.get(0),
>>>> "com.databricks.spark.avro");
>>>> long totalSize = getDirSize(inputPaths.get(0));
>>>>
>>>> for (int i = 1; i < inputs.size(); ++i) {
>>>>     dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
>>>> "com.databricks.spark.avro"));
>>>>     totalSize += getDirSize(inputPaths.get(i));
>>>> }
>>>>
>>>> int targetPartitions = (int) Math.max(2L, totalSize /
>>>> TARGET_SIZE_BYTES);
>>>> DataFrame outputFrame;
>>>>
>>>> // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
>>>> // the synchronize block below. Suggestions welcome here too! :-)
>>>> synchronized (this) {
>>>>     RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
>>>> null);
>>>>     outputFrame = sqlContext.createDataFrame(inputRdd,
>>>> dfInput.schema());
>>>> }
>>>>
>>>> outputFrame.save(outputPath, "parquet", SaveMode.Overwrite);
>>>>
>>>> Here are some things bothering me:
>>>>
>>>>    - Conversion to an RDD and back again so that we can use coalesce()
>>>>    to reduce the number of partitions. This is because we read that
>>>>    repartition() is not as efficient as coalesce(), and local micro 
>>>> benchmarks
>>>>    seemed to somewhat confirm that this was faster. Is this really a good 
>>>> idea
>>>>    though? Should we be doing something else?
>>>>
>>>> Repartition uses coalesce but with a forced shuffle step. Its just a
>>> shortcut for coalesce(xxx, true)
>>> Doing a coalesce sounds correct, I'd do the same :) Note that if you add
>>> the shuffle step, then your partitions should be better balanced.
>>>
>>>>
>>>>    - Usage of unionAll() - this is the only way I could find to join
>>>>    the separate data sets into a single data frame to save as Parquet. Is
>>>>    there a better way?
>>>>
>>>> When using directly the inputformats you can do this
>>> FileInputFormat.addInputPath, it should perform at least as good as union.
>>>
>>>>
>>>>    - Do I need to be using the DataFrame API at all? I'm not querying
>>>>    any data here, so the nice API for SQL-like transformations of the data
>>>>    isn't being used. The DataFrame API just seemed like the path of least
>>>>    resistance for working with Avro and Parquet. Would there be any 
>>>> advantage
>>>>    to using hadoopRDD() with the appropriate Input/Output formats?
>>>>
>>>>
>>>>
>>> Using directly the input/outputformats sounds viable. But the snippet
>>> you show seems clean enough and I am not sure there would be much value in
>>> making something (maybe) slightly faster but harder to understand.
>>>
>>>
>>> Eugen
>>>
>>> Any advice or tips greatly appreciated!
>>>>
>>>>
>>>> James.
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to