It turns out my assumption on load and unionAll being blocking is not correct. They are transformations. So instead of just running only the load and unionAll in the run() methods, I think you will have to save the intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS like http://tachyon-project.org/) in the run() methods. The second for loop will also have to load from the intermediate parquet files. Then finally save the final dfInput[0] to the HDFS.
I think this way of parallelizing will force the cluster to utilize the all the resources. -Kiran On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar <loni...@gmail.com> wrote: > James, > > As I can see, there are three distinct parts to your program: > > - for loop > - synchronized block > - final outputFrame.save statement > > Can you do a separate timing measurement by putting a simple > System.currentTimeMillis() around these blocks to know how much they are > taking and then try to optimize where it takes longest? In the second > block, you may want to measure the time for the two statements. Improving > this boils down to playing with spark settings. > > Now consider the first block: I think this is a classic case of merge sort > or a reduce tree. You already tried to improve this by submitting jobs in > parallel using executor pool/Callable etc. > > To further improve the parallelization, I suggest you use a reduce tree > like approach. For example, lets say you want to compute sum of all > elements of an array in parallel. The way its solved for a GPU like > platform is you divide your input array initially in chunks of 2, compute > those n/2 sums parallely on separate threads and save the result in the > first of the two elements. In the next iteration, you compute n/4 sums > parallely of the earlier sums and so on till you are left with only two > elements whose sum gives you final sum. > > You are performing many sequential unionAll operations for inputs.size() > avro files. Assuming the unionAll() on DataFrame is blocking (and not a > simple transformation like on RDDs) and actually performs the union > operation, you will certainly benefit by parallelizing this loop. You may > change the loop to something like below: > > // pseudo code only > int n = inputs.size() > // initialize executor > executor = new FixedThreadPoolExecutor(n/2) > dfInput = new DataFrame[n/2] > for(int i =0;i < n/2;i++) { > executor.submit(new Runnable() { > public void run() { > // union of i and i+n/2 > // showing [] only to bring out array access. Replace with > dfInput(i) in your code > dfInput[i] = sqlContext.load(inputPaths.get(i), > "com.databricks.spark.avro").unionAll(sqlContext.load(inputsPath.get(i + > n/2), "com.databricks.spark.avro")) > } > }); > } > > executor.awaitTermination(0, TimeUnit.SECONDS) > > int steps = log(n)/log(2.0) > for(s = 2; s < steps;s++) { > int stride = n/(1 << s); // n/(2^s) > for(int i = 0;i < stride;i++) { > executor.submit(new Runnable() { > public void run() { > // union of i and i+n/2 > // showing [] only to bring out array access. Replace with > dfInput(i) and dfInput(i+stride) in your code > dfInput[i] = dfInput[i].unionAll(dfInput[i + stride]) > } > }); > } > executor.awaitTermination(0, TimeUnit.SECONDS) > } > > Let me know if it helped. > > -Kiran > > > On Thu, Jun 4, 2015 at 8:57 PM, James Aley <james.a...@swiftkey.com> > wrote: > >> Thanks for the confirmation! We're quite new to Spark, so a little >> reassurance is a good thing to have sometimes :-) >> >> The thing that's concerning me at the moment is that my job doesn't seem >> to run any faster with more compute resources added to the cluster, and >> this is proving a little tricky to debug. There are a lot of variables, so >> here's what we've tried already and the apparent impact. If anyone has any >> further suggestions, we'd love to hear! >> >> * Increase the "minimum" number of output files (targetPartitions above), >> so that input groups smaller than our minimum chunk size can still be >> worked on by more than one executor. This does measurably speed things up, >> but obviously it's a trade-off, as the original goal for this job is to >> merge our data into fewer, larger files. >> >> * Submit many jobs in parallel, by running the above code in a Callable, >> on an executor pool. This seems to help, to some extent, but I'm not sure >> what else needs to be configured alongside it -- driver threads, scheduling >> policy, etc. We set scheduling to "FAIR" when doing this, as that seemed >> like the right approach, but we're not 100% confident. It seemed to help >> quite substantially anyway, so perhaps this just needs further tuning? >> >> * Increasing executors, RAM, etc. This doesn't make a difference by >> itself for this job, so I'm thinking we're already not fully utilising the >> resources we have in a smaller cluster. >> >> Again, any recommendations appreciated. Thanks for the help! >> >> >> James. >> >> On 4 June 2015 at 15:00, Eugen Cepoi <cepoi.eu...@gmail.com> wrote: >> >>> Hi >>> >>> 2015-06-04 15:29 GMT+02:00 James Aley <james.a...@swiftkey.com>: >>> >>>> Hi, >>>> >>>> We have a load of Avro data coming into our data systems in the form of >>>> relatively small files, which we're merging into larger Parquet files with >>>> Spark. I've been following the docs and the approach I'm taking seemed >>>> fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's >>>> not the most optimal approach. >>>> >>>> I was wondering if anyone on this list might have some advice to make >>>> to make this job as efficient as possible. Here's some code: >>>> >>>> DataFrame dfInput = sqlContext.load(inputPaths.get(0), >>>> "com.databricks.spark.avro"); >>>> long totalSize = getDirSize(inputPaths.get(0)); >>>> >>>> for (int i = 1; i < inputs.size(); ++i) { >>>> dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), >>>> "com.databricks.spark.avro")); >>>> totalSize += getDirSize(inputPaths.get(i)); >>>> } >>>> >>>> int targetPartitions = (int) Math.max(2L, totalSize / >>>> TARGET_SIZE_BYTES); >>>> DataFrame outputFrame; >>>> >>>> // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence >>>> // the synchronize block below. Suggestions welcome here too! :-) >>>> synchronized (this) { >>>> RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions, false, >>>> null); >>>> outputFrame = sqlContext.createDataFrame(inputRdd, >>>> dfInput.schema()); >>>> } >>>> >>>> outputFrame.save(outputPath, "parquet", SaveMode.Overwrite); >>>> >>>> Here are some things bothering me: >>>> >>>> - Conversion to an RDD and back again so that we can use coalesce() >>>> to reduce the number of partitions. This is because we read that >>>> repartition() is not as efficient as coalesce(), and local micro >>>> benchmarks >>>> seemed to somewhat confirm that this was faster. Is this really a good >>>> idea >>>> though? Should we be doing something else? >>>> >>>> Repartition uses coalesce but with a forced shuffle step. Its just a >>> shortcut for coalesce(xxx, true) >>> Doing a coalesce sounds correct, I'd do the same :) Note that if you add >>> the shuffle step, then your partitions should be better balanced. >>> >>>> >>>> - Usage of unionAll() - this is the only way I could find to join >>>> the separate data sets into a single data frame to save as Parquet. Is >>>> there a better way? >>>> >>>> When using directly the inputformats you can do this >>> FileInputFormat.addInputPath, it should perform at least as good as union. >>> >>>> >>>> - Do I need to be using the DataFrame API at all? I'm not querying >>>> any data here, so the nice API for SQL-like transformations of the data >>>> isn't being used. The DataFrame API just seemed like the path of least >>>> resistance for working with Avro and Parquet. Would there be any >>>> advantage >>>> to using hadoopRDD() with the appropriate Input/Output formats? >>>> >>>> >>>> >>> Using directly the input/outputformats sounds viable. But the snippet >>> you show seems clean enough and I am not sure there would be much value in >>> making something (maybe) slightly faster but harder to understand. >>> >>> >>> Eugen >>> >>> Any advice or tips greatly appreciated! >>>> >>>> >>>> James. >>>> >>>> >>>> >>> >> >