Pig has implemented map side merge joins in this way. If the storage mechanism contains an index (e.g. Zebra) it can use it.

Alan.

On Jul 21, 2010, at 5:22 PM, Deem, Mike wrote:

We are planning to use Hadoop to run a number of recurring jobs that involve map side joins.

Rather than requiring that the joined datasets be partitioned into separate part-* files, we are considering the following solution. Our concerns with the partitioned approach include:

· All the inputs may not have been partitioned the same. In our environment we have a number of datasets that are produced and consumed by a number of different jobs. We run production jobs and experimental jobs that may or may not use exactly the same code, etc. Generating a matched set of partitions before you can run a job that involves joins requires extra work and is likely to be error prone.

· We run our jobs on clusters of varying sizes. We have production and experimental clusters and make use of Amazon’s EMR for ad-hoc job runs. Pre-partitioned data may not be well matched to the size of the cluster on which a given job will be run.

In our proposed solution, each input dataset will have an associated split index. The split index could be computed as the dataset is written or could be generated on the fly and cached.

In either case, the split index is generated as outlined in the following pseudo code:

              previousSplitID = -1;
              splitStartOffset = 0;
                For each key in the input {
                                hash = computeHash(key);
                                splitID = hash % MAX_SPLITS;
if(previousSplitID != splitID) {
if(previousSplitID != -1) {
index.write(previousSplitID, splitStartOffset, getCurrentOffset(input) – splitStartOffset);
 }
previousSplitID = splitID;
splitStartOffset = getCurrentOffset(input);
                                }
if(previousSplitID != -1) {
index.write(previousSplitID, splitStartOffset, getCurrentOffset(input) - splitStartOffset);
}
                }

When generating the splits for the join, each input’s split index would be read to determine the splits for that input and the corresponding splits processed together. Note that the actual number of splits used for any given job doesn’t have to be MAX_SPLITS. If MAX_SPLITS is large, multiple splits from the index could easily be combined to create a number of splits matched to the cluster size.

I would like to hear any thoughts you may have about this approach. Has something similar already been implemented? Does this approach raise any concerns we may not have thought of?

Thanks,

  == Mike ==

Mike Deem
md...@amazon.com


Reply via email to