Hey Kiran, Thanks very much for the response. I left for vacation before I could try this out, but I'll experiment once I get back and let you know how it goes.
Thanks! James. On 8 June 2015 at 12:34, kiran lonikar <loni...@gmail.com> wrote: > It turns out my assumption on load and unionAll being blocking is not > correct. They are transformations. So instead of just running only the load > and unionAll in the run() methods, I think you will have to save the > intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS > like http://tachyon-project.org/) in the run() methods. The second for > loop will also have to load from the intermediate parquet files. Then > finally save the final dfInput[0] to the HDFS. > > I think this way of parallelizing will force the cluster to utilize the > all the resources. > > -Kiran > > On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar <loni...@gmail.com> wrote: > >> James, >> >> As I can see, there are three distinct parts to your program: >> >> - for loop >> - synchronized block >> - final outputFrame.save statement >> >> Can you do a separate timing measurement by putting a simple >> System.currentTimeMillis() around these blocks to know how much they are >> taking and then try to optimize where it takes longest? In the second >> block, you may want to measure the time for the two statements. Improving >> this boils down to playing with spark settings. >> >> Now consider the first block: I think this is a classic case of merge >> sort or a reduce tree. You already tried to improve this by submitting jobs >> in parallel using executor pool/Callable etc. >> >> To further improve the parallelization, I suggest you use a reduce tree >> like approach. For example, lets say you want to compute sum of all >> elements of an array in parallel. The way its solved for a GPU like >> platform is you divide your input array initially in chunks of 2, compute >> those n/2 sums parallely on separate threads and save the result in the >> first of the two elements. In the next iteration, you compute n/4 sums >> parallely of the earlier sums and so on till you are left with only two >> elements whose sum gives you final sum. >> >> You are performing many sequential unionAll operations for inputs.size() >> avro files. Assuming the unionAll() on DataFrame is blocking (and not a >> simple transformation like on RDDs) and actually performs the union >> operation, you will certainly benefit by parallelizing this loop. You may >> change the loop to something like below: >> >> // pseudo code only >> int n = inputs.size() >> // initialize executor >> executor = new FixedThreadPoolExecutor(n/2) >> dfInput = new DataFrame[n/2] >> for(int i =0;i < n/2;i++) { >> executor.submit(new Runnable() { >> public void run() { >> // union of i and i+n/2 >> // showing [] only to bring out array access. Replace with >> dfInput(i) in your code >> dfInput[i] = sqlContext.load(inputPaths.get(i), >> "com.databricks.spark.avro").unionAll(sqlContext.load(inputsPath.get(i + >> n/2), "com.databricks.spark.avro")) >> } >> }); >> } >> >> executor.awaitTermination(0, TimeUnit.SECONDS) >> >> int steps = log(n)/log(2.0) >> for(s = 2; s < steps;s++) { >> int stride = n/(1 << s); // n/(2^s) >> for(int i = 0;i < stride;i++) { >> executor.submit(new Runnable() { >> public void run() { >> // union of i and i+n/2 >> // showing [] only to bring out array access. Replace >> with dfInput(i) and dfInput(i+stride) in your code >> dfInput[i] = dfInput[i].unionAll(dfInput[i + stride]) >> } >> }); >> } >> executor.awaitTermination(0, TimeUnit.SECONDS) >> } >> >> Let me know if it helped. >> >> -Kiran >> >> >> On Thu, Jun 4, 2015 at 8:57 PM, James Aley <james.a...@swiftkey.com> >> wrote: >> >>> Thanks for the confirmation! We're quite new to Spark, so a little >>> reassurance is a good thing to have sometimes :-) >>> >>> The thing that's concerning me at the moment is that my job doesn't seem >>> to run any faster with more compute resources added to the cluster, and >>> this is proving a little tricky to debug. There are a lot of variables, so >>> here's what we've tried already and the apparent impact. If anyone has any >>> further suggestions, we'd love to hear! >>> >>> * Increase the "minimum" number of output files (targetPartitions >>> above), so that input groups smaller than our minimum chunk size can still >>> be worked on by more than one executor. This does measurably speed things >>> up, but obviously it's a trade-off, as the original goal for this job is to >>> merge our data into fewer, larger files. >>> >>> * Submit many jobs in parallel, by running the above code in a Callable, >>> on an executor pool. This seems to help, to some extent, but I'm not sure >>> what else needs to be configured alongside it -- driver threads, scheduling >>> policy, etc. We set scheduling to "FAIR" when doing this, as that seemed >>> like the right approach, but we're not 100% confident. It seemed to help >>> quite substantially anyway, so perhaps this just needs further tuning? >>> >>> * Increasing executors, RAM, etc. This doesn't make a difference by >>> itself for this job, so I'm thinking we're already not fully utilising the >>> resources we have in a smaller cluster. >>> >>> Again, any recommendations appreciated. Thanks for the help! >>> >>> >>> James. >>> >>> On 4 June 2015 at 15:00, Eugen Cepoi <cepoi.eu...@gmail.com> wrote: >>> >>>> Hi >>>> >>>> 2015-06-04 15:29 GMT+02:00 James Aley <james.a...@swiftkey.com>: >>>> >>>>> Hi, >>>>> >>>>> We have a load of Avro data coming into our data systems in the form >>>>> of relatively small files, which we're merging into larger Parquet files >>>>> with Spark. I've been following the docs and the approach I'm taking >>>>> seemed >>>>> fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's >>>>> not the most optimal approach. >>>>> >>>>> I was wondering if anyone on this list might have some advice to make >>>>> to make this job as efficient as possible. Here's some code: >>>>> >>>>> DataFrame dfInput = sqlContext.load(inputPaths.get(0), >>>>> "com.databricks.spark.avro"); >>>>> long totalSize = getDirSize(inputPaths.get(0)); >>>>> >>>>> for (int i = 1; i < inputs.size(); ++i) { >>>>> dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), >>>>> "com.databricks.spark.avro")); >>>>> totalSize += getDirSize(inputPaths.get(i)); >>>>> } >>>>> >>>>> int targetPartitions = (int) Math.max(2L, totalSize / >>>>> TARGET_SIZE_BYTES); >>>>> DataFrame outputFrame; >>>>> >>>>> // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence >>>>> // the synchronize block below. Suggestions welcome here too! :-) >>>>> synchronized (this) { >>>>> RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions, >>>>> false, null); >>>>> outputFrame = sqlContext.createDataFrame(inputRdd, >>>>> dfInput.schema()); >>>>> } >>>>> >>>>> outputFrame.save(outputPath, "parquet", SaveMode.Overwrite); >>>>> >>>>> Here are some things bothering me: >>>>> >>>>> - Conversion to an RDD and back again so that we can use >>>>> coalesce() to reduce the number of partitions. This is because we read >>>>> that >>>>> repartition() is not as efficient as coalesce(), and local micro >>>>> benchmarks >>>>> seemed to somewhat confirm that this was faster. Is this really a good >>>>> idea >>>>> though? Should we be doing something else? >>>>> >>>>> Repartition uses coalesce but with a forced shuffle step. Its just a >>>> shortcut for coalesce(xxx, true) >>>> Doing a coalesce sounds correct, I'd do the same :) Note that if you >>>> add the shuffle step, then your partitions should be better balanced. >>>> >>>>> >>>>> - Usage of unionAll() - this is the only way I could find to join >>>>> the separate data sets into a single data frame to save as Parquet. Is >>>>> there a better way? >>>>> >>>>> When using directly the inputformats you can do this >>>> FileInputFormat.addInputPath, it should perform at least as good as union. >>>> >>>>> >>>>> - Do I need to be using the DataFrame API at all? I'm not querying >>>>> any data here, so the nice API for SQL-like transformations of the data >>>>> isn't being used. The DataFrame API just seemed like the path of least >>>>> resistance for working with Avro and Parquet. Would there be any >>>>> advantage >>>>> to using hadoopRDD() with the appropriate Input/Output formats? >>>>> >>>>> >>>>> >>>> Using directly the input/outputformats sounds viable. But the snippet >>>> you show seems clean enough and I am not sure there would be much value in >>>> making something (maybe) slightly faster but harder to understand. >>>> >>>> >>>> Eugen >>>> >>>> Any advice or tips greatly appreciated! >>>>> >>>>> >>>>> James. >>>>> >>>>> >>>>> >>>> >>> >> >