Pig has implemented map side merge joins in this way. If the storage
mechanism contains an index (e.g. Zebra) it can use it.
Alan.
On Jul 21, 2010, at 5:22 PM, Deem, Mike wrote:
We are planning to use Hadoop to run a number of recurring jobs that
involve map side joins.
Rather than requiring that the joined datasets be partitioned into
separate part-* files, we are considering the following solution.
Our concerns with the partitioned approach include:
· All the inputs may not have been partitioned the same. In
our environment we have a number of datasets that are produced and
consumed by a number of different jobs. We run production jobs and
experimental jobs that may or may not use exactly the same code,
etc. Generating a matched set of partitions before you can run a job
that involves joins requires extra work and is likely to be error
prone.
· We run our jobs on clusters of varying sizes. We have
production and experimental clusters and make use of Amazon’s EMR
for ad-hoc job runs. Pre-partitioned data may not be well matched to
the size of the cluster on which a given job will be run.
In our proposed solution, each input dataset will have an associated
split index. The split index could be computed as the dataset is
written or could be generated on the fly and cached.
In either case, the split index is generated as outlined in the
following pseudo code:
previousSplitID = -1;
splitStartOffset = 0;
For each key in the input {
hash = computeHash(key);
splitID = hash % MAX_SPLITS;
if(previousSplitID != splitID) {
if(previousSplitID != -1) {
index.write(previousSplitID, splitStartOffset,
getCurrentOffset(input) – splitStartOffset);
}
previousSplitID = splitID;
splitStartOffset = getCurrentOffset(input);
}
if(previousSplitID != -1) {
index.write(previousSplitID, splitStartOffset,
getCurrentOffset(input) - splitStartOffset);
}
}
When generating the splits for the join, each input’s split index
would be read to determine the splits for that input and the
corresponding splits processed together. Note that the actual number
of splits used for any given job doesn’t have to be MAX_SPLITS. If
MAX_SPLITS is large, multiple splits from the index could easily be
combined to create a number of splits matched to the cluster size.
I would like to hear any thoughts you may have about this approach.
Has something similar already been implemented? Does this approach
raise any concerns we may not have thought of?
Thanks,
== Mike ==
Mike Deem
md...@amazon.com