It definitely has a significant performance gain when dealing with big records that result in a slow shuffle.
By the way, I discovered (confirmed) an issue with the normal join implementation, where objects are held on to without using a deep copy. I was thinking of fixing that this evening -- are you planning on doing the big package rename today? On 06 Jul 2012, at 18:57, Josh Wills <[email protected]> wrote: > Not a problem man-- can't wait to play with it this weekend! > > On Fri, Jul 6, 2012 at 9:50 AM, Gabriel Reid <[email protected]> wrote: >> Hi Josh, >> >> I've just committed the map side joins, after doing some more >> extensive testing on it today. Unfortunately, I forgot to squash the >> multiple commits into a single one (as I was planning on doing), but >> it's all in there and working. >> >> - Gabriel >> >> On Fri, Jul 6, 2012 at 7:28 AM, Gabriel Reid <[email protected]> wrote: >>> Thanks for making the JIRA issue. >>> >>> I was going to do some more testing an a "real" cluster later on today, and >>> as long as that all checks out then this will be ready to go in as far as >>> I'm concerned. >>> >>> I don't see any reason to hold back on the big renaming for this patch -- >>> if you're ready to do the package renaming, please go ahead. >>> >>> As long as my testing later today checks out and there aren't any glaring >>> issues from anyone who has tried the patch out, I'll probably be checking >>> the patch in later today (so it'll probably be in before you do the >>> renaming anyhow). >>> >>> - Gabriel >>> >>> >>> On Friday 6 July 2012 at 00:11, Josh Wills wrote: >>> >>>> FYI: https://issues.apache.org/jira/browse/CRUNCH-3 to track this. >>>> >>>> Gabriel, is your feeling that this is ready to go in? I'm debating whether >>>> or not to check this in before/after I do the massive com.cloudera -> >>>> org.apache renaming. What do you think? >>>> >>>> On Tue, Jul 3, 2012 at 1:25 PM, Gabriel Reid <[email protected] >>>> (mailto:[email protected])> wrote: >>>> >>>>> Hi Joe, >>>>> >>>>> Looking forward to hearing your feedback! This is still just a patch and >>>>> not committed yet, and there's still definitely room for help (i.e. ideas >>>>> on how to improve it, or do it totally differently), so certainly let me >>>>> know if you've got some ideas. >>>>> >>>>> - Gabriel >>>>> >>>>> >>>>> On Tuesday 3 July 2012 at 18:01, Joseph Adler wrote: >>>>> >>>>>> Awesome! Will give this a try soon... wish I could have helped with this >>>>> one... >>>>>> >>>>>> On Tue, Jul 3, 2012 at 1:53 AM, Gabriel Reid <[email protected] >>>>>> (mailto:[email protected])(mailto: >>>>> [email protected] (mailto:[email protected]))> wrote: >>>>>>> Thanks for pointing that out Chris. I'm guessing the mailing list is >>>>>>> stripping out attachments(?). Once the JIRA is up and running then >>>>>>> that will be taken care of I guess. >>>>>>> >>>>>>> The mime type on the last attempt was application/octet-stream, so >>>>>>> I've renamed this to a .txt file to try to ensure that it'll get a >>>>>>> text/plain mime type (although I don't know if that'll make a >>>>>>> difference). I've also pasted it inline below, hopefully one of those >>>>>>> solutions works. >>>>>>> >>>>>>> - Gabriel >>>>>>> >>>>>>> >>>>>>> diff --git src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >>>>>>> src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >>>>>>> index 420e8dc..c8ba596 100644 >>>>>>> --- src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >>>>>>> +++ src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >>>>>>> @@ -59,9 +59,9 @@ import com.google.common.collect.Sets; >>>>>>> public class MRPipeline implements Pipeline { >>>>>>> >>>>>>> private static final Log LOG = LogFactory.getLog(MRPipeline.class); >>>>>>> - >>>>>>> + >>>>>>> private static final Random RANDOM = new Random(); >>>>>>> - >>>>>>> + >>>>>>> private final Class<?> jarClass; >>>>>>> private final String name; >>>>>>> private final Map<PCollectionImpl<?>, Set<Target>> outputTargets; >>>>>>> @@ -75,15 +75,15 @@ public class MRPipeline implements Pipeline { >>>>>>> public MRPipeline(Class<?> jarClass) throws IOException { >>>>>>> this(jarClass, new Configuration()); >>>>>>> } >>>>>>> - >>>>>>> - public MRPipeline(Class<?> jarClass, String name){ >>>>>>> + >>>>>>> + public MRPipeline(Class<?> jarClass, String name) { >>>>>>> this(jarClass, name, new Configuration()); >>>>>>> } >>>>>>> - >>>>>>> + >>>>>>> public MRPipeline(Class<?> jarClass, Configuration conf) { >>>>>>> - this(jarClass, jarClass.getName(), conf); >>>>>>> + this(jarClass, jarClass.getName(), conf); >>>>>>> } >>>>>>> - >>>>>>> + >>>>>>> public MRPipeline(Class<?> jarClass, String name, Configuration conf) { >>>>>>> this.jarClass = jarClass; >>>>>>> this.name (http://this.name) = name; >>>>>>> @@ -102,9 +102,9 @@ public class MRPipeline implements Pipeline { >>>>>>> >>>>>>> @Override >>>>>>> public void setConfiguration(Configuration conf) { >>>>>>> - this.conf = conf; >>>>>>> + this.conf = conf; >>>>>>> } >>>>>>> - >>>>>>> + >>>>>>> @Override >>>>>>> public PipelineResult run() { >>>>>>> MSCRPlanner planner = new MSCRPlanner(this, outputTargets); >>>>>>> @@ -125,8 +125,8 @@ public class MRPipeline implements Pipeline { >>>>>>> boolean materialized = false; >>>>>>> for (Target t : outputTargets.get(c)) { >>>>>>> if (!materialized && t instanceof Source) { >>>>>>> - c.materializeAt((SourceTarget) t); >>>>>>> - materialized = true; >>>>>>> + c.materializeAt((SourceTarget) t); >>>>>>> + materialized = true; >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> @@ -144,7 +144,7 @@ public class MRPipeline implements Pipeline { >>>>>>> cleanup(); >>>>>>> return res; >>>>>>> } >>>>>>> - >>>>>>> + >>>>>>> public <S> PCollection<S> read(Source<S> source) { >>>>>>> return new InputCollection<S>(source, this); >>>>>>> } >>>>>>> @@ -160,85 +160,120 @@ public class MRPipeline implements Pipeline { >>>>>>> @SuppressWarnings("unchecked") >>>>>>> public void write(PCollection<?> pcollection, Target target) { >>>>>>> if (pcollection instanceof PGroupedTableImpl) { >>>>>>> - pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup(); >>>>>>> + pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup(); >>>>>>> } else if (pcollection instanceof UnionCollection || pcollection >>>>>>> instanceof UnionTable) { >>>>>>> - pcollection = pcollection.parallelDo("UnionCollectionWrapper", >>>>>>> - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType()); >>>>>>> + pcollection = pcollection.parallelDo("UnionCollectionWrapper", >>>>>>> + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType()); >>>>>>> } >>>>>>> addOutput((PCollectionImpl<?>) pcollection, target); >>>>>>> } >>>>>>> >>>>>>> private void addOutput(PCollectionImpl<?> impl, Target target) { >>>>>>> if (!outputTargets.containsKey(impl)) { >>>>>>> - outputTargets.put(impl, Sets.<Target>newHashSet()); >>>>>>> + outputTargets.put(impl, Sets.<Target> newHashSet()); >>>>>>> } >>>>>>> outputTargets.get(impl).add(target); >>>>>>> } >>>>>>> - >>>>>>> + >>>>>>> @Override >>>>>>> public <T> Iterable<T> materialize(PCollection<T> pcollection) { >>>>>>> - >>>>>>> - if (pcollection instanceof UnionCollection) { >>>>>>> - pcollection = pcollection.parallelDo("UnionCollectionWrapper", >>>>>>> - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType()); >>>>>>> - } >>>>>>> - PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection; >>>>>>> + >>>>>>> + PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection); >>>>>>> + ReadableSourceTarget<T> srcTarget = >>>>>>> getMaterializeSourceTarget(pcollectionImpl); >>>>>>> + >>>>>>> + MaterializableIterable<T> c = new MaterializableIterable<T>(this, >>>>>>> srcTarget); >>>>>>> + if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) { >>>>>>> + outputTargetsToMaterialize.put(pcollectionImpl, c); >>>>>>> + } >>>>>>> + return c; >>>>>>> + } >>>>>>> + >>>>>>> + /** >>>>>>> + * Retrieve a ReadableSourceTarget that provides access to the >>>>>> >>>>> >>>>> >>>>> contents of a >>>>>>> + * {@link PCollection}. This is primarily intended as a helper method >>>>>> >>>>> >>>>> >>>>> to >>>>>>> + * {@link #materialize(PCollection)}. The underlying data of the >>>>>>> + * ReadableSourceTarget may not be actually present until the >>>>>>> pipeline is run. >>>>>>> + * >>>>>>> + * @param pcollection >>>>>>> + * The collection for which the ReadableSourceTarget is to be >>>>>>> + * retrieved >>>>>>> + * @return The ReadableSourceTarget >>>>>>> + * @throws IllegalArgumentException >>>>>>> + * If no ReadableSourceTarget can be retrieved for the given >>>>>>> + * PCollection >>>>>>> + */ >>>>>>> + public <T> ReadableSourceTarget<T> >>>>>>> getMaterializeSourceTarget(PCollection<T> pcollection) { >>>>>>> + PCollectionImpl<T> impl = toPcollectionImpl(pcollection); >>>>>>> SourceTarget<T> matTarget = impl.getMaterializedAt(); >>>>>>> if (matTarget != null && matTarget instanceof ReadableSourceTarget) { >>>>>>> - return new MaterializableIterable<T>(this, >>>>>>> (ReadableSourceTarget<T>) matTarget); >>>>>>> + return (ReadableSourceTarget<T>) matTarget; >>>>>>> + } >>>>>>> + >>>>>>> + ReadableSourceTarget<T> srcTarget = null; >>>>>>> + if (outputTargets.containsKey(pcollection)) { >>>>>>> + for (Target target : outputTargets.get(impl)) { >>>>>>> + if (target instanceof ReadableSourceTarget) { >>>>>>> + srcTarget = (ReadableSourceTarget<T>) target; >>>>>>> + break; >>>>>>> + } >>>>>>> + } >>>>>>> } >>>>>>> - >>>>>>> - ReadableSourceTarget<T> srcTarget = null; >>>>>>> - if (outputTargets.containsKey(pcollection)) { >>>>>>> - for (Target target : outputTargets.get(impl)) { >>>>>>> - if (target instanceof ReadableSourceTarget) { >>>>>>> - srcTarget = (ReadableSourceTarget) target; >>>>>>> - break; >>>>>>> - } >>>>>>> - } >>>>>>> - } >>>>>>> - >>>>>>> - if (srcTarget == null) { >>>>>>> - SourceTarget<T> st = >>>>>> >>>>> >>>>> >>>>> createIntermediateOutput(pcollection.getPType()); >>>>>>> - if (!(st instanceof ReadableSourceTarget)) { >>>>>>> - throw new IllegalArgumentException("The PType for the given >>>>>>> PCollection is not readable" >>>>>>> - + " and cannot be materialized"); >>>>>>> - } else { >>>>>>> - srcTarget = (ReadableSourceTarget) st; >>>>>>> - addOutput(impl, srcTarget); >>>>>>> - } >>>>>>> - } >>>>>>> - >>>>>>> - MaterializableIterable<T> c = new MaterializableIterable<T>(this, >>>>>> >>>>> >>>>> >>>>> srcTarget); >>>>>>> - outputTargetsToMaterialize.put(impl, c); >>>>>>> - return c; >>>>>>> + >>>>>>> + if (srcTarget == null) { >>>>>>> + SourceTarget<T> st = >>>>>> >>>>> >>>>> >>>>> createIntermediateOutput(pcollection.getPType()); >>>>>>> + if (!(st instanceof ReadableSourceTarget)) { >>>>>>> + throw new IllegalArgumentException("The PType for the given >>>>>>> PCollection is not readable" >>>>>>> + + " and cannot be materialized"); >>>>>>> + } else { >>>>>>> + srcTarget = (ReadableSourceTarget<T>) st; >>>>>>> + addOutput(impl, srcTarget); >>>>>>> + } >>>>>>> + } >>>>>>> + >>>>>>> + return srcTarget; >>>>>>> + } >>>>>>> + >>>>>>> + /** >>>>>>> + * Safely cast a PCollection into a PCollectionImpl, including >>>>>>> handling the case of UnionCollections. >>>>>>> + * @param pcollection The PCollection to be cast/transformed >>>>>>> + * @return The PCollectionImpl representation >>>>>>> + */ >>>>>>> + private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> >>>>>>> pcollection) { >>>>>>> + PCollectionImpl<T> pcollectionImpl = null; >>>>>>> + if (pcollection instanceof UnionCollection) { >>>>>>> + pcollectionImpl = (PCollectionImpl<T>) >>>>>>> pcollection.parallelDo("UnionCollectionWrapper", >>>>>>> + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType()); >>>>>>> + } else { >>>>>>> + pcollectionImpl = (PCollectionImpl<T>) pcollection; >>>>>>> + } >>>>>>> + return pcollectionImpl; >>>>>>> } >>>>>>> >>>>>>> public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) { >>>>>>> - return ptype.getDefaultFileSource(createTempPath()); >>>>>>> + return ptype.getDefaultFileSource(createTempPath()); >>>>>>> } >>>>>>> >>>>>>> public Path createTempPath() { >>>>>>> tempFileIndex++; >>>>>>> return new Path(tempDirectory, "p" + tempFileIndex); >>>>>>> } >>>>>>> - >>>>>>> + >>>>>>> private static Path createTempDirectory(Configuration conf) { >>>>>>> Path dir = new Path("/tmp/crunch" + RANDOM.nextInt()); >>>>>>> - try { >>>>>>> - FileSystem.get(conf).mkdirs(dir); >>>>>>> - } catch (IOException e) { >>>>>>> - LOG.error("Exception creating job output directory", e); >>>>>>> - throw new RuntimeException(e); >>>>>>> - } >>>>>>> + try { >>>>>>> + FileSystem.get(conf).mkdirs(dir); >>>>>>> + } catch (IOException e) { >>>>>>> + LOG.error("Exception creating job output directory", e); >>>>>>> + throw new RuntimeException(e); >>>>>>> + } >>>>>>> return dir; >>>>>>> } >>>>>>> - >>>>>>> + >>>>>>> @Override >>>>>>> public <T> void writeTextFile(PCollection<T> pcollection, String >>>>>> >>>>> >>>>> >>>>> pathName) { >>>>>>> // Ensure that this is a writable pcollection instance. >>>>>>> - pcollection = pcollection.parallelDo("asText", >>>>>> >>>>> >>>>> >>>>> IdentityFn.<T>getInstance(), >>>>>>> - WritableTypeFamily.getInstance().as(pcollection.getPType())); >>>>>>> + pcollection = pcollection.parallelDo("asText", IdentityFn.<T> >>>>>>> getInstance(), WritableTypeFamily >>>>>>> + .getInstance().as(pcollection.getPType())); >>>>>>> write(pcollection, At.textFile(pathName)); >>>>>>> } >>>>>>> >>>>>>> @@ -256,7 +291,7 @@ public class MRPipeline implements Pipeline { >>>>>>> LOG.info (http://LOG.info)("Exception during cleanup", e); >>>>>>> } >>>>>>> } >>>>>>> - >>>>>>> + >>>>>>> public int getNextAnonymousStageId() { >>>>>>> return nextAnonymousStageId++; >>>>>>> } >>>>>>> @@ -265,7 +300,7 @@ public class MRPipeline implements Pipeline { >>>>>>> public void enableDebug() { >>>>>>> // Turn on Crunch runtime error catching. >>>>>>> getConfiguration().setBoolean(RuntimeParameters.DEBUG, true); >>>>>>> - >>>>>>> + >>>>>>> // Write Hadoop's WARN logs to the console. >>>>>>> Logger crunchInfoLogger = LogManager.getLogger("com.cloudera.crunch"); >>>>>>> Appender console = crunchInfoLogger.getAppender("A"); >>>>>>> @@ -277,9 +312,9 @@ public class MRPipeline implements Pipeline { >>>>>>> LOG.warn("Could not find console appender named 'A' for writing >>>>>>> Hadoop warning logs"); >>>>>>> } >>>>>>> } >>>>>>> - >>>>>>> + >>>>>>> @Override >>>>>>> public String getName() { >>>>>>> - return name; >>>>>>> + return name; >>>>>>> } >>>>>>> } >>>>>>> diff --git >>>>>> >>>>> >>>>> >>>>> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >>>>>> >>>>> >>>>> >>>>> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >>>>>>> index d41a52e..68ef054 100644 >>>>>>> --- >>>>>> >>>>> >>>>> >>>>> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >>>>>>> +++ >>>>>> >>>>> >>>>> >>>>> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >>>>>>> @@ -12,6 +12,10 @@ public class CrunchRuntimeException extends >>>>>>> RuntimeException { >>>>>>> super(e); >>>>>>> } >>>>>>> >>>>>>> + public CrunchRuntimeException(String msg, Exception e) { >>>>>>> + super(msg, e); >>>>>>> + } >>>>>>> + >>>>>>> public boolean wasLogged() { >>>>>>> return logged; >>>>>>> } >>>>>>> diff --git >>>>>> >>>>> >>>>> >>>>> src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >>>>>>> src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >>>>>>> index 1122d62..4debfeb 100644 >>>>>>> --- src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >>>>>>> +++ src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >>>>>>> @@ -45,7 +45,8 @@ public class AvroFileSource<T> extends >>>>>>> FileSourceImpl<T> implements ReadableSour >>>>>>> >>>>>>> @Override >>>>>>> public Iterable<T> read(Configuration conf) throws IOException { >>>>>>> - return CompositePathIterable.create(FileSystem.get(conf), path, >>>>>>> new AvroFileReaderFactory<T>( >>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf); >>>>>>> + return CompositePathIterable.create(fs, path, new >>>>>> >>>>> >>>>> >>>>> AvroFileReaderFactory<T>( >>>>>>> (AvroType<T>) ptype, conf)); >>>>>>> } >>>>>>> } >>>>>>> diff --git src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >>>>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >>>>>>> index 24dec2d..462ef93 100644 >>>>>>> --- src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >>>>>>> +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >>>>>>> @@ -26,18 +26,16 @@ import com.cloudera.crunch.io.ReadableSource; >>>>>>> import com.cloudera.crunch.io.impl.FileSourceImpl; >>>>>>> import com.cloudera.crunch.types.PType; >>>>>>> >>>>>>> -public class SeqFileSource<T> extends FileSourceImpl<T> implements >>>>>>> - ReadableSource<T> { >>>>>>> +public class SeqFileSource<T> extends FileSourceImpl<T> implements >>>>>>> ReadableSource<T> { >>>>>>> >>>>>>> public SeqFileSource(Path path, PType<T> ptype) { >>>>>>> - super(path, ptype, SequenceFileInputFormat.class); >>>>>>> + super(path, ptype, SequenceFileInputFormat.class); >>>>>>> } >>>>>>> - >>>>>>> + >>>>>>> @Override >>>>>>> public Iterable<T> read(Configuration conf) throws IOException { >>>>>>> - FileSystem fs = FileSystem.get(conf); >>>>>>> - return CompositePathIterable.create(fs, path, >>>>>>> - new SeqFileReaderFactory<T>(ptype, conf)); >>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf); >>>>>>> + return CompositePathIterable.create(fs, path, new >>>>>>> SeqFileReaderFactory<T>(ptype, conf)); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> diff --git >>>>>> >>>>> >>>>> >>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >>>>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >>>>>>> index 69ca12b..4db6658 100644 >>>>>>> --- src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >>>>>>> +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >>>>>>> @@ -42,7 +42,7 @@ public class SeqFileTableSource<K, V> extends >>>>>>> FileTableSourceImpl<K, V> implemen >>>>>>> >>>>>>> @Override >>>>>>> public Iterable<Pair<K, V>> read(Configuration conf) throws >>>>>> >>>>> >>>>> >>>>> IOException { >>>>>>> - FileSystem fs = FileSystem.get(conf); >>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf); >>>>>>> return CompositePathIterable.create(fs, path, >>>>>>> new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf)); >>>>>>> } >>>>>>> diff --git >>>>>> >>>>> >>>>> >>>>> src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >>>>>>> src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >>>>>>> index a876843..e0dbe68 100644 >>>>>>> --- src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >>>>>>> +++ src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >>>>>>> @@ -67,7 +67,7 @@ public class TextFileSource<T> extends >>>>>>> FileSourceImpl<T> implements >>>>>>> >>>>>>> @Override >>>>>>> public Iterable<T> read(Configuration conf) throws IOException { >>>>>>> - return CompositePathIterable.create(FileSystem.get(conf), path, >>>>>>> - new TextFileReaderFactory<T>(ptype, conf)); >>>>>>> + return CompositePathIterable.create(FileSystem.get(path.toUri(), >>>>>>> conf), path, >>>>>>> + new TextFileReaderFactory<T>(ptype, conf)); >>>>>>> } >>>>>>> } >>>>>>> diff --git src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java >>>>>>> src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java >>>>>>> new file mode 100644 >>>>>>> index 0000000..8072e07 >>>>>>> --- /dev/null >>>>>>> +++ src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java >>>>>>> @@ -0,0 +1,143 @@ >>>>>>> +package com.cloudera.crunch.lib.join; >>>>>>> + >>>>>>> +import java.io.IOException; >>>>>>> + >>>>>>> +import org.apache.hadoop.filecache.DistributedCache; >>>>>>> +import org.apache.hadoop.fs.FileSystem; >>>>>>> +import org.apache.hadoop.fs.Path; >>>>>>> + >>>>>>> +import com.cloudera.crunch.DoFn; >>>>>>> +import com.cloudera.crunch.Emitter; >>>>>>> +import com.cloudera.crunch.PTable; >>>>>>> +import com.cloudera.crunch.Pair; >>>>>>> +import com.cloudera.crunch.impl.mr.MRPipeline; >>>>>>> +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; >>>>>>> +import com.cloudera.crunch.io.ReadableSourceTarget; >>>>>>> +import com.cloudera.crunch.io.impl.SourcePathTargetImpl; >>>>>>> +import com.cloudera.crunch.types.PType; >>>>>>> +import com.cloudera.crunch.types.PTypeFamily; >>>>>>> +import com.google.common.collect.ArrayListMultimap; >>>>>>> +import com.google.common.collect.Multimap; >>>>>>> + >>>>>>> +/** >>>>>>> + * Utility for doing map side joins on a common key between two >>>>>>> {@link PTable}s. >>>>>>> + * <p> >>>>>>> + * A map side join is an optimized join which doesn't use a reducer; >>>>>> >>>>> >>>>> >>>>> instead, >>>>>>> + * the right side of the join is loaded into memory and the join is >>>>>>> performed in >>>>>>> + * a mapper. This style of join has the important implication that >>>>>>> the output of >>>>>>> + * the join is not sorted, which is the case with a conventional >>>>>>> (reducer-based) >>>>>>> + * join. >>>>>>> + * <p> >>>>>>> + * <b>Note:</b>This utility is only supported when running with a >>>>>>> + * {@link MRPipeline} as the pipeline. >>>>>>> + */ >>>>>>> +public class MapsideJoin { >>>>>>> + >>>>>>> + /** >>>>>>> + * Join two tables using a map side join. The right-side table will >>>>>> >>>>> >>>>> >>>>> be loaded >>>>>>> + * fully in memory, so this method should only be used if the right >>>>>> >>>>> >>>>> >>>>> side >>>>>>> + * table's contents can fit in the memory allocated to mappers. The >>>>>> >>>>> >>>>> >>>>> join >>>>>>> + * performed by this method is an inner join. >>>>>>> + * >>>>>>> + * @param left >>>>>>> + * The left-side table of the join >>>>>>> + * @param right >>>>>>> + * The right-side table of the join, whose contents will be fully >>>>>>> + * read into memory >>>>>>> + * @return A table keyed on the join key, containing pairs of joined >>>>>> >>>>> >>>>> >>>>> values >>>>>>> + */ >>>>>>> + public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> >>>>>>> left, PTable<K, V> right) { >>>>>>> + >>>>>>> + if (!(right.getPipeline() instanceof MRPipeline)) { >>>>>>> + throw new CrunchRuntimeException("Map-side join is only >>>>>>> supported within a MapReduce context"); >>>>>>> + } >>>>>>> + >>>>>>> + MRPipeline pipeline = (MRPipeline) right.getPipeline(); >>>>>>> + pipeline.materialize(right); >>>>>>> + >>>>>>> + // TODO Move necessary logic to MRPipeline so that we can >>>>>> >>>>> >>>>> >>>>> theoretically >>>>>>> + // optimize his by running the setup of multiple map-side joins >>>>>>> concurrently >>>>>>> + pipeline.run(); >>>>>>> + >>>>>>> + ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline >>>>>>> + .getMaterializeSourceTarget(right); >>>>>>> + if (!(readableSourceTarget instanceof SourcePathTargetImpl)) { >>>>>>> + throw new CrunchRuntimeException("Right-side contents can't be >>>>>>> read from a path"); >>>>>>> + } >>>>>>> + >>>>>>> + // Suppress warnings because we've just checked this cast via >>>>>> >>>>> >>>>> >>>>> instanceof >>>>>>> + @SuppressWarnings("unchecked") >>>>>>> + SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = >>>>>>> (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget; >>>>>>> + >>>>>>> + Path path = sourcePathTarget.getPath(); >>>>>>> + DistributedCache.addCacheFile(path.toUri(), >>>>>> >>>>> >>>>> >>>>> pipeline.getConfiguration()); >>>>>>> + >>>>>>> + MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, >>>>>>> V>(path.toString(), >>>>>>> + right.getPType()); >>>>>>> + PTypeFamily typeFamily = left.getTypeFamily(); >>>>>>> + return left.parallelDo( >>>>>>> + "mapjoin", >>>>>>> + mapJoinDoFn, >>>>>>> + typeFamily.tableOf(left.getKeyType(), >>>>>>> + typeFamily.pairs(left.getValueType(), right.getValueType()))); >>>>>>> + >>>>>>> + } >>>>>>> + >>>>>>> + static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, >>>>>>> Pair<K, Pair<U, V>>> { >>>>>>> + >>>>>>> + private String inputPath; >>>>>>> + private PType<Pair<K, V>> ptype; >>>>>>> + private Multimap<K, V> joinMap; >>>>>>> + >>>>>>> + public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) { >>>>>>> + this.inputPath = inputPath; >>>>>>> + this.ptype = ptype; >>>>>>> + } >>>>>>> + >>>>>>> + private Path getCacheFilePath() { >>>>>>> + try { >>>>>>> + for (Path localPath : >>>>>>> DistributedCache.getLocalCacheFiles(getConfiguration())) { >>>>>>> + if (localPath.toString().endsWith(inputPath)) { >>>>>>> + return >>>>>>> localPath.makeQualified(FileSystem.getLocal(getConfiguration())); >>>>>>> + >>>>>>> + } >>>>>>> + } >>>>>>> + } catch (IOException e) { >>>>>>> + throw new CrunchRuntimeException(e); >>>>>>> + } >>>>>>> + >>>>>>> + throw new CrunchRuntimeException("Can't find local cache file >>>>>>> for '" + inputPath + "'"); >>>>>>> + } >>>>>>> + >>>>>>> + @Override >>>>>>> + public void initialize() { >>>>>>> + super.initialize(); >>>>>>> + >>>>>>> + ReadableSourceTarget<Pair<K, V>> sourceTarget = >>>>>>> (ReadableSourceTarget<Pair<K, V>>) ptype >>>>>>> + .getDefaultFileSource(getCacheFilePath()); >>>>>>> + Iterable<Pair<K, V>> iterable = null; >>>>>>> + try { >>>>>>> + iterable = sourceTarget.read(getConfiguration()); >>>>>>> + } catch (IOException e) { >>>>>>> + throw new CrunchRuntimeException("Error reading right-side of >>>>>>> map side join: ", e); >>>>>>> + } >>>>>>> + >>>>>>> + joinMap = ArrayListMultimap.create(); >>>>>>> + for (Pair<K, V> joinPair : iterable) { >>>>>>> + joinMap.put(joinPair.first(), joinPair.second()); >>>>>>> + } >>>>>>> + } >>>>>>> + >>>>>>> + @Override >>>>>>> + public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, >>>>>>> V>>> emitter) { >>>>>>> + K key = input.first(); >>>>>>> + U value = input.second(); >>>>>>> + for (V joinValue : joinMap.get(key)) { >>>>>>> + Pair<U, V> valuePair = Pair.of(value, joinValue); >>>>>>> + emitter.emit(Pair.of(key, valuePair)); >>>>>>> + } >>>>>>> + } >>>>>>> + >>>>>>> + } >>>>>>> + >>>>>>> +} >>>>>>> diff --git src/main/java/com/cloudera/crunch/types/PType.java >>>>>>> src/main/java/com/cloudera/crunch/types/PType.java >>>>>>> index af4ef1b..ae480aa 100644 >>>>>>> --- src/main/java/com/cloudera/crunch/types/PType.java >>>>>>> +++ src/main/java/com/cloudera/crunch/types/PType.java >>>>>>> @@ -15,6 +15,7 @@ >>>>>>> >>>>>>> package com.cloudera.crunch.types; >>>>>>> >>>>>>> +import java.io.Serializable; >>>>>>> import java.util.List; >>>>>>> >>>>>>> import org.apache.hadoop.fs.Path; >>>>>>> @@ -31,7 +32,7 @@ import com.cloudera.crunch.SourceTarget; >>>>>>> * {@code PCollection}. >>>>>>> * >>>>>>> */ >>>>>>> -public interface PType<T> { >>>>>>> +public interface PType<T> extends Serializable { >>>>>>> /** >>>>>>> * Returns the Java type represented by this {@code PType}. >>>>>>> */ >>>>>>> diff --git src/main/java/com/cloudera/crunch/types/avro/AvroType.java >>>>>>> src/main/java/com/cloudera/crunch/types/avro/AvroType.java >>>>>>> index 3db00c0..29af9fb 100644 >>>>>>> --- src/main/java/com/cloudera/crunch/types/avro/AvroType.java >>>>>>> +++ src/main/java/com/cloudera/crunch/types/avro/AvroType.java >>>>>>> @@ -14,6 +14,7 @@ >>>>>>> */ >>>>>>> package com.cloudera.crunch.types.avro; >>>>>>> >>>>>>> +import java.io.Serializable; >>>>>>> import java.util.List; >>>>>>> >>>>>>> import org.apache.avro.Schema; >>>>>>> @@ -41,7 +42,8 @@ public class AvroType<T> implements PType<T> { >>>>>>> private static final Converter AVRO_CONVERTER = new AvroKeyConverter(); >>>>>>> >>>>>>> private final Class<T> typeClass; >>>>>>> - private final Schema schema; >>>>>>> + private final String schemaString; >>>>>>> + private transient Schema schema; >>>>>>> private final MapFn baseInputMapFn; >>>>>>> private final MapFn baseOutputMapFn; >>>>>>> private final List<PType> subTypes; >>>>>>> @@ -55,6 +57,7 @@ public class AvroType<T> implements PType<T> { >>>>>>> MapFn outputMapFn, PType... ptypes) { >>>>>>> this.typeClass = typeClass; >>>>>>> this.schema = Preconditions.checkNotNull(schema); >>>>>>> + this.schemaString = schema.toString(); >>>>>>> this.baseInputMapFn = inputMapFn; >>>>>>> this.baseOutputMapFn = outputMapFn; >>>>>>> this.subTypes = ImmutableList.<PType> builder().add(ptypes).build(); >>>>>>> @@ -76,6 +79,9 @@ public class AvroType<T> implements PType<T> { >>>>>>> } >>>>>>> >>>>>>> public Schema getSchema() { >>>>>>> + if (schema == null){ >>>>>>> + schema = new Schema.Parser().parse(schemaString); >>>>>>> + } >>>>>>> return schema; >>>>>>> } >>>>>>> >>>>>>> diff --git >>>>> src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java >>>>>>> src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java >>>>>>> new file mode 100644 >>>>>>> index 0000000..f265460 >>>>>>> --- /dev/null >>>>>>> +++ src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java >>>>>>> @@ -0,0 +1,60 @@ >>>>>>> +package com.cloudera.crunch.impl.mr; >>>>>>> + >>>>>>> +import static org.junit.Assert.assertEquals; >>>>>>> +import static org.mockito.Mockito.doReturn; >>>>>>> +import static org.mockito.Mockito.mock; >>>>>>> +import static org.mockito.Mockito.spy; >>>>>>> +import static org.mockito.Mockito.when; >>>>>>> + >>>>>>> +import java.io.IOException; >>>>>>> + >>>>>>> +import org.junit.Before; >>>>>>> +import org.junit.Test; >>>>>>> + >>>>>>> +import com.cloudera.crunch.SourceTarget; >>>>>>> +import com.cloudera.crunch.impl.mr.collect.PCollectionImpl; >>>>>>> +import com.cloudera.crunch.io.ReadableSourceTarget; >>>>>>> +import com.cloudera.crunch.types.avro.Avros; >>>>>>> + >>>>>>> +public class MRPipelineTest { >>>>>>> + >>>>>>> + private MRPipeline pipeline; >>>>>>> + >>>>>>> + @Before >>>>>>> + public void setUp() throws IOException { >>>>>>> + pipeline = spy(new MRPipeline(MRPipelineTest.class)); >>>>>>> + } >>>>>>> + >>>>>>> + @Test >>>>>>> + public void testGetMaterializeSourceTarget_AlreadyMaterialized() { >>>>>>> + PCollectionImpl<String> materializedPcollection = >>>>>>> mock(PCollectionImpl.class); >>>>>>> + ReadableSourceTarget<String> readableSourceTarget = >>>>>>> mock(ReadableSourceTarget.class); >>>>>>> + >>>>>> >>>>> >>>>> >>>>> when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget); >>>>>>> + >>>>>>> + assertEquals(readableSourceTarget, >>>>>>> pipeline.getMaterializeSourceTarget(materializedPcollection)); >>>>>>> + } >>>>>>> + >>>>>>> + @Test >>>>>>> + public void >>>>>> >>>>> >>>>> >>>>> testGetMaterializeSourceTarget_NotMaterialized_HasOutput() { >>>>>>> + >>>>>>> + PCollectionImpl<String> pcollection = mock(PCollectionImpl.class); >>>>>>> + ReadableSourceTarget<String> readableSourceTarget = >>>>>>> mock(ReadableSourceTarget.class); >>>>>>> + when(pcollection.getPType()).thenReturn(Avros.strings()); >>>>>>> + >>>>>> >>>>> >>>>> >>>>> doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); >>>>>>> + when(pcollection.getMaterializedAt()).thenReturn(null); >>>>>>> + >>>>>>> + assertEquals(readableSourceTarget, >>>>>>> pipeline.getMaterializeSourceTarget(pcollection)); >>>>>>> + } >>>>>>> + >>>>>>> + @Test(expected = IllegalArgumentException.class) >>>>>>> + public void >>>>>> >>>>> >>>>> >>>>> testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() >>>>>>> { >>>>>>> + PCollectionImpl<String> pcollection = mock(PCollectionImpl.class); >>>>>>> + SourceTarget<String> nonReadableSourceTarget = >>>>>> >>>>> >>>>> >>>>> mock(SourceTarget.class); >>>>>>> + when(pcollection.getPType()).thenReturn(Avros.strings()); >>>>>>> + >>>>>> >>>>> >>>>> >>>>> doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); >>>>>>> + when(pcollection.getMaterializedAt()).thenReturn(null); >>>>>>> + >>>>>>> + pipeline.getMaterializeSourceTarget(pcollection); >>>>>>> + } >>>>>>> + >>>>>>> +} >>>>>>> diff --git >>>>>> >>>>> >>>>> >>>>> src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java >>>>>>> src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java >>>>>>> new file mode 100644 >>>>>>> index 0000000..97e0c63 >>>>>>> --- /dev/null >>>>>>> +++ src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java >>>>>>> @@ -0,0 +1,102 @@ >>>>>>> +package com.cloudera.crunch.lib.join; >>>>>>> + >>>>>>> +import static org.junit.Assert.assertEquals; >>>>>>> +import static org.junit.Assert.assertTrue; >>>>>>> + >>>>>>> +import java.io.IOException; >>>>>>> +import java.util.Collections; >>>>>>> +import java.util.List; >>>>>>> + >>>>>>> +import org.junit.Test; >>>>>>> + >>>>>>> +import com.cloudera.crunch.FilterFn; >>>>>>> +import com.cloudera.crunch.MapFn; >>>>>>> +import com.cloudera.crunch.PTable; >>>>>>> +import com.cloudera.crunch.Pair; >>>>>>> +import com.cloudera.crunch.Pipeline; >>>>>>> +import com.cloudera.crunch.impl.mem.MemPipeline; >>>>>>> +import com.cloudera.crunch.impl.mr.MRPipeline; >>>>>>> +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; >>>>>>> +import com.cloudera.crunch.test.FileHelper; >>>>>>> +import com.cloudera.crunch.types.writable.Writables; >>>>>>> +import com.google.common.collect.Lists; >>>>>>> + >>>>>>> +public class MapsideJoinTest { >>>>>>> + >>>>>>> + private static class LineSplitter extends MapFn<String, >>>>>>> Pair<Integer, String>> { >>>>>>> + >>>>>>> + @Override >>>>>>> + public Pair<Integer, String> map(String input) { >>>>>>> + String[] fields = input.split("\\|"); >>>>>>> + return Pair.of(Integer.parseInt(fields[0]), fields[1]); >>>>>>> + } >>>>>>> + >>>>>>> + } >>>>>>> + >>>>>>> + private static class NegativeFilter extends FilterFn<Pair<Integer, >>>>>> >>>>> >>>>> >>>>> String>> { >>>>>>> + >>>>>>> + @Override >>>>>>> + public boolean accept(Pair<Integer, String> input) { >>>>>>> + return false; >>>>>>> + } >>>>>>> + >>>>>>> + } >>>>>>> + >>>>>>> + @Test(expected = CrunchRuntimeException.class) >>>>>>> + public void testNonMapReducePipeline() { >>>>>>> + runMapsideJoin(MemPipeline.getInstance()); >>>>>>> + } >>>>>>> + >>>>>>> + @Test >>>>>>> + public void testMapsideJoin_RightSideIsEmpty() throws IOException { >>>>>>> + MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class); >>>>>>> + PTable<Integer, String> customerTable = readTable(pipeline, >>>>>>> "customers.txt"); >>>>>>> + PTable<Integer, String> orderTable = readTable(pipeline, >>>>>> >>>>> >>>>> >>>>> "orders.txt"); >>>>>>> + >>>>>>> + PTable<Integer, String> filteredOrderTable = >>>>>>> orderTable.parallelDo(new NegativeFilter(), >>>>>>> + orderTable.getPTableType()); >>>>>>> + >>>>>>> + PTable<Integer, Pair<String, String>> joined = >>>>>>> MapsideJoin.join(customerTable, >>>>>>> + filteredOrderTable); >>>>>>> + >>>>>>> + List<Pair<Integer, Pair<String, String>>> materializedJoin = >>>>>>> Lists.newArrayList(joined >>>>>>> + .materialize()); >>>>>>> + >>>>>>> + assertTrue(materializedJoin.isEmpty()); >>>>>>> + >>>>>>> + } >>>>>>> + >>>>>>> + @Test >>>>>>> + public void testMapsideJoin() throws IOException { >>>>>>> + runMapsideJoin(new MRPipeline(MapsideJoinTest.class)); >>>>>>> + } >>>>>>> + >>>>>>> + private void runMapsideJoin(Pipeline pipeline) { >>>>>>> + PTable<Integer, String> customerTable = readTable(pipeline, >>>>>>> "customers.txt"); >>>>>>> + PTable<Integer, String> orderTable = readTable(pipeline, >>>>>> >>>>> >>>>> >>>>> "orders.txt"); >>>>>>> + >>>>>>> + PTable<Integer, Pair<String, String>> joined = >>>>>>> MapsideJoin.join(customerTable, orderTable); >>>>>>> + >>>>>>> + List<Pair<Integer, Pair<String, String>>> expectedJoinResult = >>>>>>> Lists.newArrayList(); >>>>>>> + expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn >>>>>> >>>>> >>>>> >>>>> flakes"))); >>>>>>> + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet >>>>>> >>>>> >>>>> >>>>> paper"))); >>>>>>> + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet >>>>>>> plunger"))); >>>>>>> + expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", >>>>>>> "Toilet brush"))); >>>>>>> + >>>>>>> + List<Pair<Integer, Pair<String, String>>> joinedResultList = >>>>>>> Lists.newArrayList(joined >>>>>>> + .materialize()); >>>>>>> + Collections.sort(joinedResultList); >>>>>>> + >>>>>>> + assertEquals(expectedJoinResult, joinedResultList); >>>>>>> + } >>>>>>> + >>>>>>> + private static PTable<Integer, String> readTable(Pipeline pipeline, >>>>>>> String filename) { >>>>>>> + try { >>>>>>> + return >>>>>> >>>>> >>>>> >>>>> pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable", >>>>>>> + new LineSplitter(), Writables.tableOf(Writables.ints(), >>>>>>> Writables.strings())); >>>>>>> + } catch (IOException e) { >>>>>>> + throw new RuntimeException(e); >>>>>>> + } >>>>>>> + } >>>>>>> + >>>>>>> +} >>>>>>> diff --git src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >>>>>>> src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >>>>>>> index 74e2ad3..c6a0b46 100644 >>>>>>> --- src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >>>>>>> +++ src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >>>>>>> @@ -25,6 +25,7 @@ import org.apache.avro.Schema.Type; >>>>>>> import org.apache.avro.generic.GenericData; >>>>>>> import org.apache.avro.util.Utf8; >>>>>>> import org.apache.hadoop.io.LongWritable; >>>>>>> +import org.junit.Ignore; >>>>>>> import org.junit.Test; >>>>>>> >>>>>>> import com.cloudera.crunch.Pair; >>>>>>> @@ -103,6 +104,7 @@ public class AvrosTest { >>>>>>> } >>>>>>> >>>>>>> @Test >>>>>>> + @Ignore("This test creates an invalid schema that causes >>>>>>> Schema#toString to fail") >>>>>>> public void testNestedTables() throws Exception { >>>>>>> PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), >>>>>> >>>>> >>>>> >>>>> Avros.longs()); >>>>>>> PTableType<Pair<Long, Long>, String> nest = Avros.tableOf(pll, >>>>>>> Avros.strings()); >>>>>>> diff --git src/test/resources/customers.txt >>>>>> >>>>> >>>>> >>>>> src/test/resources/customers.txt >>>>>>> new file mode 100644 >>>>>>> index 0000000..98f3f3d >>>>>>> --- /dev/null >>>>>>> +++ src/test/resources/customers.txt >>>>>>> @@ -0,0 +1,4 @@ >>>>>>> +111|John Doe >>>>>>> +222|Jane Doe >>>>>>> +333|Someone Else >>>>>>> +444|Has No Orders >>>>>>> \ No newline at end of file >>>>>>> diff --git src/test/resources/orders.txt src/test/resources/orders.txt >>>>>>> new file mode 100644 >>>>>>> index 0000000..2f1383f >>>>>>> --- /dev/null >>>>>>> +++ src/test/resources/orders.txt >>>>>>> @@ -0,0 +1,4 @@ >>>>>>> +222|Toilet plunger >>>>>>> +333|Toilet brush >>>>>>> +222|Toilet paper >>>>>>> +111|Corn flakes >>>>>>> \ No newline at end of file >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jul 3, 2012 at 10:42 AM, Christian Tzolov >>>>>>> <[email protected] (mailto:[email protected])> >>>>>> >>>>> >>>>> >>>>> wrote: >>>>>>>> Hi Gabriel, >>>>>>>> >>>>>>>> Seems like the attachment is missing. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Chris >>>>>>>> >>>>>>>> On Tue, Jul 3, 2012 at 9:23 AM, Gabriel Reid <[email protected] >>>>>>>> (mailto:[email protected])(mailto: >>>>> [email protected] (mailto:[email protected]))> wrote: >>>>>>>> >>>>>>>>> Hi guys, >>>>>>>>> >>>>>>>>> Attached (hopefully) is a patch for an initial implementation of >>>>> map >>>>>>>>> side joins. It's currently implemented as a static method in a >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> class >>>>>>>>> called MapsideJoin, with the same interface as the existing Join >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> class >>>>>>>>> (with only inner joins being implemented for now). The way it >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> works is >>>>>>>>> that the right-side PTable of the join is put in the distributed >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> cache >>>>>>>>> and then read by the join function at runtime. >>>>>>>>> >>>>>>>>> There's one spot that I can see for a potentially interesting >>>>>>>>> optimization -- MRPipeline#run is called once for each map side >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> join >>>>>>>>> that is set up, but if the setup of the joins was done within >>>>>>>>> MRPipeline, then we could set up multiple map side joins in >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> parallel >>>>>>>>> with a single call to MRPipeline#run. OTOH, a whole bunch of map >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> side >>>>>>>>> joins in parallel probably isn't that common of an operation. >>>>>>>>> >>>>>>>>> If anyone feels like taking a look at the patch, any feedback is >>>>>>>>> appreciated. If nobody sees something that needs serious changes in >>>>>>>>> the patch, I'll commit it. >>>>>>>>> >>>>>>>>> - Gabriel >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jun 21, 2012 at 9:09 AM, Gabriel Reid < >>>>> [email protected] (mailto:[email protected])> >>>>>>>>> wrote: >>>>>>>>>> Replying to all... >>>>>>>>>> >>>>>>>>>> On Thu, Jun 21, 2012 at 8:40 AM, Josh Wills <[email protected] >>>>>>>>>> (mailto:[email protected])(mailto: >>>>> [email protected] (mailto:[email protected]))> wrote: >>>>>>>>>>> >>>>>>>>>>> So there's a philosophical issue here: should Crunch ever make >>>>>>>>>>> decisions about how to do something itself based on its >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> estimates of >>>>>>>>>>> the size of the data sets, or should it always do exactly what >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> the >>>>>>>>>>> developer indicates? >>>>>>>>>>> >>>>>>>>>>> I can make a case either way, but I think that no matter what, >>>>> we >>>>>>>>>>> would want to have explicit functions for performing a join >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> that reads >>>>>>>>>>> one data set into memory, so I think we can proceed w/the >>>>>>>>>>> implementation while folks weigh in on what their preferences >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> are for >>>>>>>>>>> the default join() behavior (e.g., just do a reduce-side join, >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> or try >>>>>>>>>>> to figure out the best join given information about the input >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> data and >>>>>>>>>>> some configuration parameters.) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I definitely agree on needing to have an explicit way to invoke >>>>> one or >>>>>>>>>> the other -- and in general I don't like having magic behind the >>>>>>>>>> scenes to decide on behaviour (especially considering Crunch is >>>>>>>>>> generally intended to be closer to the metal than Pig and Hive). >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> I'm >>>>>>>>>> not sure if the runtime decision is something specific to some >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> of my >>>>>>>>>> use cases or if it could be useful to a wider audience. >>>>>>>>>> >>>>>>>>>> The ability to dynamically decide at runtime whether a map side >>>>> join >>>>>>>>>> should be used can also easily be tacked on outside of Crunch, >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> and >>>>>>>>>> won't impact the underlying implementation (as you pointed out), >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> so I >>>>>>>>>> definitely also agree on focusing on the underlying >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> implementation >>>>>>>>>> first, and we can worry about the options used for exposing it >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> later >>>>>>>>>> on. >>>>>>>>>> >>>>>>>>>> - Gabriel >>>> >>>> >>>> -- >>>> Director of Data Science >>>> Cloudera <http://www.cloudera.com> >>>> Twitter: @josh_wills <http://twitter.com/josh_wills> >>> >>> >>>
