On Fri, Jul 6, 2012 at 9:57 AM, Josh Wills <[email protected]> wrote: > Not a problem man-- can't wait to play with it this weekend!
(I really need to get a hobby of some kind.) > > On Fri, Jul 6, 2012 at 9:50 AM, Gabriel Reid <[email protected]> wrote: >> Hi Josh, >> >> I've just committed the map side joins, after doing some more >> extensive testing on it today. Unfortunately, I forgot to squash the >> multiple commits into a single one (as I was planning on doing), but >> it's all in there and working. >> >> - Gabriel >> >> On Fri, Jul 6, 2012 at 7:28 AM, Gabriel Reid <[email protected]> wrote: >>> Thanks for making the JIRA issue. >>> >>> I was going to do some more testing an a "real" cluster later on today, and >>> as long as that all checks out then this will be ready to go in as far as >>> I'm concerned. >>> >>> I don't see any reason to hold back on the big renaming for this patch -- >>> if you're ready to do the package renaming, please go ahead. >>> >>> As long as my testing later today checks out and there aren't any glaring >>> issues from anyone who has tried the patch out, I'll probably be checking >>> the patch in later today (so it'll probably be in before you do the >>> renaming anyhow). >>> >>> - Gabriel >>> >>> >>> On Friday 6 July 2012 at 00:11, Josh Wills wrote: >>> >>>> FYI: https://issues.apache.org/jira/browse/CRUNCH-3 to track this. >>>> >>>> Gabriel, is your feeling that this is ready to go in? I'm debating whether >>>> or not to check this in before/after I do the massive com.cloudera -> >>>> org.apache renaming. What do you think? >>>> >>>> On Tue, Jul 3, 2012 at 1:25 PM, Gabriel Reid <[email protected] >>>> (mailto:[email protected])> wrote: >>>> >>>> > Hi Joe, >>>> > >>>> > Looking forward to hearing your feedback! This is still just a patch and >>>> > not committed yet, and there's still definitely room for help (i.e. ideas >>>> > on how to improve it, or do it totally differently), so certainly let me >>>> > know if you've got some ideas. >>>> > >>>> > - Gabriel >>>> > >>>> > >>>> > On Tuesday 3 July 2012 at 18:01, Joseph Adler wrote: >>>> > >>>> > > Awesome! Will give this a try soon... wish I could have helped with >>>> > > this >>>> > one... >>>> > > >>>> > > On Tue, Jul 3, 2012 at 1:53 AM, Gabriel Reid <[email protected] >>>> > > (mailto:[email protected])(mailto: >>>> > [email protected] (mailto:[email protected]))> wrote: >>>> > > > Thanks for pointing that out Chris. I'm guessing the mailing list is >>>> > > > stripping out attachments(?). Once the JIRA is up and running then >>>> > > > that will be taken care of I guess. >>>> > > > >>>> > > > The mime type on the last attempt was application/octet-stream, so >>>> > > > I've renamed this to a .txt file to try to ensure that it'll get a >>>> > > > text/plain mime type (although I don't know if that'll make a >>>> > > > difference). I've also pasted it inline below, hopefully one of those >>>> > > > solutions works. >>>> > > > >>>> > > > - Gabriel >>>> > > > >>>> > > > >>>> > > > diff --git src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >>>> > > > src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >>>> > > > index 420e8dc..c8ba596 100644 >>>> > > > --- src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >>>> > > > +++ src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >>>> > > > @@ -59,9 +59,9 @@ import com.google.common.collect.Sets; >>>> > > > public class MRPipeline implements Pipeline { >>>> > > > >>>> > > > private static final Log LOG = LogFactory.getLog(MRPipeline.class); >>>> > > > - >>>> > > > + >>>> > > > private static final Random RANDOM = new Random(); >>>> > > > - >>>> > > > + >>>> > > > private final Class<?> jarClass; >>>> > > > private final String name; >>>> > > > private final Map<PCollectionImpl<?>, Set<Target>> outputTargets; >>>> > > > @@ -75,15 +75,15 @@ public class MRPipeline implements Pipeline { >>>> > > > public MRPipeline(Class<?> jarClass) throws IOException { >>>> > > > this(jarClass, new Configuration()); >>>> > > > } >>>> > > > - >>>> > > > - public MRPipeline(Class<?> jarClass, String name){ >>>> > > > + >>>> > > > + public MRPipeline(Class<?> jarClass, String name) { >>>> > > > this(jarClass, name, new Configuration()); >>>> > > > } >>>> > > > - >>>> > > > + >>>> > > > public MRPipeline(Class<?> jarClass, Configuration conf) { >>>> > > > - this(jarClass, jarClass.getName(), conf); >>>> > > > + this(jarClass, jarClass.getName(), conf); >>>> > > > } >>>> > > > - >>>> > > > + >>>> > > > public MRPipeline(Class<?> jarClass, String name, Configuration >>>> > > > conf) { >>>> > > > this.jarClass = jarClass; >>>> > > > this.name (http://this.name) = name; >>>> > > > @@ -102,9 +102,9 @@ public class MRPipeline implements Pipeline { >>>> > > > >>>> > > > @Override >>>> > > > public void setConfiguration(Configuration conf) { >>>> > > > - this.conf = conf; >>>> > > > + this.conf = conf; >>>> > > > } >>>> > > > - >>>> > > > + >>>> > > > @Override >>>> > > > public PipelineResult run() { >>>> > > > MSCRPlanner planner = new MSCRPlanner(this, outputTargets); >>>> > > > @@ -125,8 +125,8 @@ public class MRPipeline implements Pipeline { >>>> > > > boolean materialized = false; >>>> > > > for (Target t : outputTargets.get(c)) { >>>> > > > if (!materialized && t instanceof Source) { >>>> > > > - c.materializeAt((SourceTarget) t); >>>> > > > - materialized = true; >>>> > > > + c.materializeAt((SourceTarget) t); >>>> > > > + materialized = true; >>>> > > > } >>>> > > > } >>>> > > > } >>>> > > > @@ -144,7 +144,7 @@ public class MRPipeline implements Pipeline { >>>> > > > cleanup(); >>>> > > > return res; >>>> > > > } >>>> > > > - >>>> > > > + >>>> > > > public <S> PCollection<S> read(Source<S> source) { >>>> > > > return new InputCollection<S>(source, this); >>>> > > > } >>>> > > > @@ -160,85 +160,120 @@ public class MRPipeline implements Pipeline { >>>> > > > @SuppressWarnings("unchecked") >>>> > > > public void write(PCollection<?> pcollection, Target target) { >>>> > > > if (pcollection instanceof PGroupedTableImpl) { >>>> > > > - pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup(); >>>> > > > + pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup(); >>>> > > > } else if (pcollection instanceof UnionCollection || pcollection >>>> > > > instanceof UnionTable) { >>>> > > > - pcollection = pcollection.parallelDo("UnionCollectionWrapper", >>>> > > > - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType()); >>>> > > > + pcollection = pcollection.parallelDo("UnionCollectionWrapper", >>>> > > > + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType()); >>>> > > > } >>>> > > > addOutput((PCollectionImpl<?>) pcollection, target); >>>> > > > } >>>> > > > >>>> > > > private void addOutput(PCollectionImpl<?> impl, Target target) { >>>> > > > if (!outputTargets.containsKey(impl)) { >>>> > > > - outputTargets.put(impl, Sets.<Target>newHashSet()); >>>> > > > + outputTargets.put(impl, Sets.<Target> newHashSet()); >>>> > > > } >>>> > > > outputTargets.get(impl).add(target); >>>> > > > } >>>> > > > - >>>> > > > + >>>> > > > @Override >>>> > > > public <T> Iterable<T> materialize(PCollection<T> pcollection) { >>>> > > > - >>>> > > > - if (pcollection instanceof UnionCollection) { >>>> > > > - pcollection = pcollection.parallelDo("UnionCollectionWrapper", >>>> > > > - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType()); >>>> > > > - } >>>> > > > - PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection; >>>> > > > + >>>> > > > + PCollectionImpl<T> pcollectionImpl = >>>> > > > toPcollectionImpl(pcollection); >>>> > > > + ReadableSourceTarget<T> srcTarget = >>>> > > > getMaterializeSourceTarget(pcollectionImpl); >>>> > > > + >>>> > > > + MaterializableIterable<T> c = new MaterializableIterable<T>(this, >>>> > > > srcTarget); >>>> > > > + if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) { >>>> > > > + outputTargetsToMaterialize.put(pcollectionImpl, c); >>>> > > > + } >>>> > > > + return c; >>>> > > > + } >>>> > > > + >>>> > > > + /** >>>> > > > + * Retrieve a ReadableSourceTarget that provides access to the >>>> > > >>>> > >>>> > >>>> > contents of a >>>> > > > + * {@link PCollection}. This is primarily intended as a helper >>>> > > > method >>>> > > >>>> > >>>> > >>>> > to >>>> > > > + * {@link #materialize(PCollection)}. The underlying data of the >>>> > > > + * ReadableSourceTarget may not be actually present until the >>>> > > > pipeline is run. >>>> > > > + * >>>> > > > + * @param pcollection >>>> > > > + * The collection for which the ReadableSourceTarget is to be >>>> > > > + * retrieved >>>> > > > + * @return The ReadableSourceTarget >>>> > > > + * @throws IllegalArgumentException >>>> > > > + * If no ReadableSourceTarget can be retrieved for the given >>>> > > > + * PCollection >>>> > > > + */ >>>> > > > + public <T> ReadableSourceTarget<T> >>>> > > > getMaterializeSourceTarget(PCollection<T> pcollection) { >>>> > > > + PCollectionImpl<T> impl = toPcollectionImpl(pcollection); >>>> > > > SourceTarget<T> matTarget = impl.getMaterializedAt(); >>>> > > > if (matTarget != null && matTarget instanceof ReadableSourceTarget) { >>>> > > > - return new MaterializableIterable<T>(this, >>>> > > > (ReadableSourceTarget<T>) matTarget); >>>> > > > + return (ReadableSourceTarget<T>) matTarget; >>>> > > > + } >>>> > > > + >>>> > > > + ReadableSourceTarget<T> srcTarget = null; >>>> > > > + if (outputTargets.containsKey(pcollection)) { >>>> > > > + for (Target target : outputTargets.get(impl)) { >>>> > > > + if (target instanceof ReadableSourceTarget) { >>>> > > > + srcTarget = (ReadableSourceTarget<T>) target; >>>> > > > + break; >>>> > > > + } >>>> > > > + } >>>> > > > } >>>> > > > - >>>> > > > - ReadableSourceTarget<T> srcTarget = null; >>>> > > > - if (outputTargets.containsKey(pcollection)) { >>>> > > > - for (Target target : outputTargets.get(impl)) { >>>> > > > - if (target instanceof ReadableSourceTarget) { >>>> > > > - srcTarget = (ReadableSourceTarget) target; >>>> > > > - break; >>>> > > > - } >>>> > > > - } >>>> > > > - } >>>> > > > - >>>> > > > - if (srcTarget == null) { >>>> > > > - SourceTarget<T> st = >>>> > > >>>> > >>>> > >>>> > createIntermediateOutput(pcollection.getPType()); >>>> > > > - if (!(st instanceof ReadableSourceTarget)) { >>>> > > > - throw new IllegalArgumentException("The PType for the given >>>> > > > PCollection is not readable" >>>> > > > - + " and cannot be materialized"); >>>> > > > - } else { >>>> > > > - srcTarget = (ReadableSourceTarget) st; >>>> > > > - addOutput(impl, srcTarget); >>>> > > > - } >>>> > > > - } >>>> > > > - >>>> > > > - MaterializableIterable<T> c = new MaterializableIterable<T>(this, >>>> > > >>>> > >>>> > >>>> > srcTarget); >>>> > > > - outputTargetsToMaterialize.put(impl, c); >>>> > > > - return c; >>>> > > > + >>>> > > > + if (srcTarget == null) { >>>> > > > + SourceTarget<T> st = >>>> > > >>>> > >>>> > >>>> > createIntermediateOutput(pcollection.getPType()); >>>> > > > + if (!(st instanceof ReadableSourceTarget)) { >>>> > > > + throw new IllegalArgumentException("The PType for the given >>>> > > > PCollection is not readable" >>>> > > > + + " and cannot be materialized"); >>>> > > > + } else { >>>> > > > + srcTarget = (ReadableSourceTarget<T>) st; >>>> > > > + addOutput(impl, srcTarget); >>>> > > > + } >>>> > > > + } >>>> > > > + >>>> > > > + return srcTarget; >>>> > > > + } >>>> > > > + >>>> > > > + /** >>>> > > > + * Safely cast a PCollection into a PCollectionImpl, including >>>> > > > handling the case of UnionCollections. >>>> > > > + * @param pcollection The PCollection to be cast/transformed >>>> > > > + * @return The PCollectionImpl representation >>>> > > > + */ >>>> > > > + private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> >>>> > > > pcollection) { >>>> > > > + PCollectionImpl<T> pcollectionImpl = null; >>>> > > > + if (pcollection instanceof UnionCollection) { >>>> > > > + pcollectionImpl = (PCollectionImpl<T>) >>>> > > > pcollection.parallelDo("UnionCollectionWrapper", >>>> > > > + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType()); >>>> > > > + } else { >>>> > > > + pcollectionImpl = (PCollectionImpl<T>) pcollection; >>>> > > > + } >>>> > > > + return pcollectionImpl; >>>> > > > } >>>> > > > >>>> > > > public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) { >>>> > > > - return ptype.getDefaultFileSource(createTempPath()); >>>> > > > + return ptype.getDefaultFileSource(createTempPath()); >>>> > > > } >>>> > > > >>>> > > > public Path createTempPath() { >>>> > > > tempFileIndex++; >>>> > > > return new Path(tempDirectory, "p" + tempFileIndex); >>>> > > > } >>>> > > > - >>>> > > > + >>>> > > > private static Path createTempDirectory(Configuration conf) { >>>> > > > Path dir = new Path("/tmp/crunch" + RANDOM.nextInt()); >>>> > > > - try { >>>> > > > - FileSystem.get(conf).mkdirs(dir); >>>> > > > - } catch (IOException e) { >>>> > > > - LOG.error("Exception creating job output directory", e); >>>> > > > - throw new RuntimeException(e); >>>> > > > - } >>>> > > > + try { >>>> > > > + FileSystem.get(conf).mkdirs(dir); >>>> > > > + } catch (IOException e) { >>>> > > > + LOG.error("Exception creating job output directory", e); >>>> > > > + throw new RuntimeException(e); >>>> > > > + } >>>> > > > return dir; >>>> > > > } >>>> > > > - >>>> > > > + >>>> > > > @Override >>>> > > > public <T> void writeTextFile(PCollection<T> pcollection, String >>>> > > >>>> > >>>> > >>>> > pathName) { >>>> > > > // Ensure that this is a writable pcollection instance. >>>> > > > - pcollection = pcollection.parallelDo("asText", >>>> > > >>>> > >>>> > >>>> > IdentityFn.<T>getInstance(), >>>> > > > - WritableTypeFamily.getInstance().as(pcollection.getPType())); >>>> > > > + pcollection = pcollection.parallelDo("asText", IdentityFn.<T> >>>> > > > getInstance(), WritableTypeFamily >>>> > > > + .getInstance().as(pcollection.getPType())); >>>> > > > write(pcollection, At.textFile(pathName)); >>>> > > > } >>>> > > > >>>> > > > @@ -256,7 +291,7 @@ public class MRPipeline implements Pipeline { >>>> > > > LOG.info (http://LOG.info)("Exception during cleanup", e); >>>> > > > } >>>> > > > } >>>> > > > - >>>> > > > + >>>> > > > public int getNextAnonymousStageId() { >>>> > > > return nextAnonymousStageId++; >>>> > > > } >>>> > > > @@ -265,7 +300,7 @@ public class MRPipeline implements Pipeline { >>>> > > > public void enableDebug() { >>>> > > > // Turn on Crunch runtime error catching. >>>> > > > getConfiguration().setBoolean(RuntimeParameters.DEBUG, true); >>>> > > > - >>>> > > > + >>>> > > > // Write Hadoop's WARN logs to the console. >>>> > > > Logger crunchInfoLogger = >>>> > > > LogManager.getLogger("com.cloudera.crunch"); >>>> > > > Appender console = crunchInfoLogger.getAppender("A"); >>>> > > > @@ -277,9 +312,9 @@ public class MRPipeline implements Pipeline { >>>> > > > LOG.warn("Could not find console appender named 'A' for writing >>>> > > > Hadoop warning logs"); >>>> > > > } >>>> > > > } >>>> > > > - >>>> > > > + >>>> > > > @Override >>>> > > > public String getName() { >>>> > > > - return name; >>>> > > > + return name; >>>> > > > } >>>> > > > } >>>> > > > diff --git >>>> > > >>>> > >>>> > >>>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >>>> > > >>>> > >>>> > >>>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >>>> > > > index d41a52e..68ef054 100644 >>>> > > > --- >>>> > > >>>> > >>>> > >>>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >>>> > > > +++ >>>> > > >>>> > >>>> > >>>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >>>> > > > @@ -12,6 +12,10 @@ public class CrunchRuntimeException extends >>>> > > > RuntimeException { >>>> > > > super(e); >>>> > > > } >>>> > > > >>>> > > > + public CrunchRuntimeException(String msg, Exception e) { >>>> > > > + super(msg, e); >>>> > > > + } >>>> > > > + >>>> > > > public boolean wasLogged() { >>>> > > > return logged; >>>> > > > } >>>> > > > diff --git >>>> > > >>>> > >>>> > >>>> > src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >>>> > > > src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >>>> > > > index 1122d62..4debfeb 100644 >>>> > > > --- src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >>>> > > > +++ src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >>>> > > > @@ -45,7 +45,8 @@ public class AvroFileSource<T> extends >>>> > > > FileSourceImpl<T> implements ReadableSour >>>> > > > >>>> > > > @Override >>>> > > > public Iterable<T> read(Configuration conf) throws IOException { >>>> > > > - return CompositePathIterable.create(FileSystem.get(conf), path, >>>> > > > new AvroFileReaderFactory<T>( >>>> > > > + FileSystem fs = FileSystem.get(path.toUri(), conf); >>>> > > > + return CompositePathIterable.create(fs, path, new >>>> > > >>>> > >>>> > >>>> > AvroFileReaderFactory<T>( >>>> > > > (AvroType<T>) ptype, conf)); >>>> > > > } >>>> > > > } >>>> > > > diff --git >>>> > > > src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >>>> > > > src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >>>> > > > index 24dec2d..462ef93 100644 >>>> > > > --- src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >>>> > > > +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >>>> > > > @@ -26,18 +26,16 @@ import com.cloudera.crunch.io.ReadableSource; >>>> > > > import com.cloudera.crunch.io.impl.FileSourceImpl; >>>> > > > import com.cloudera.crunch.types.PType; >>>> > > > >>>> > > > -public class SeqFileSource<T> extends FileSourceImpl<T> implements >>>> > > > - ReadableSource<T> { >>>> > > > +public class SeqFileSource<T> extends FileSourceImpl<T> implements >>>> > > > ReadableSource<T> { >>>> > > > >>>> > > > public SeqFileSource(Path path, PType<T> ptype) { >>>> > > > - super(path, ptype, SequenceFileInputFormat.class); >>>> > > > + super(path, ptype, SequenceFileInputFormat.class); >>>> > > > } >>>> > > > - >>>> > > > + >>>> > > > @Override >>>> > > > public Iterable<T> read(Configuration conf) throws IOException { >>>> > > > - FileSystem fs = FileSystem.get(conf); >>>> > > > - return CompositePathIterable.create(fs, path, >>>> > > > - new SeqFileReaderFactory<T>(ptype, conf)); >>>> > > > + FileSystem fs = FileSystem.get(path.toUri(), conf); >>>> > > > + return CompositePathIterable.create(fs, path, new >>>> > > > SeqFileReaderFactory<T>(ptype, conf)); >>>> > > > } >>>> > > > >>>> > > > @Override >>>> > > > diff --git >>>> > > >>>> > >>>> > >>>> > src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >>>> > > > src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >>>> > > > index 69ca12b..4db6658 100644 >>>> > > > --- src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >>>> > > > +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >>>> > > > @@ -42,7 +42,7 @@ public class SeqFileTableSource<K, V> extends >>>> > > > FileTableSourceImpl<K, V> implemen >>>> > > > >>>> > > > @Override >>>> > > > public Iterable<Pair<K, V>> read(Configuration conf) throws >>>> > > >>>> > >>>> > >>>> > IOException { >>>> > > > - FileSystem fs = FileSystem.get(conf); >>>> > > > + FileSystem fs = FileSystem.get(path.toUri(), conf); >>>> > > > return CompositePathIterable.create(fs, path, >>>> > > > new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf)); >>>> > > > } >>>> > > > diff --git >>>> > > >>>> > >>>> > >>>> > src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >>>> > > > src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >>>> > > > index a876843..e0dbe68 100644 >>>> > > > --- src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >>>> > > > +++ src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >>>> > > > @@ -67,7 +67,7 @@ public class TextFileSource<T> extends >>>> > > > FileSourceImpl<T> implements >>>> > > > >>>> > > > @Override >>>> > > > public Iterable<T> read(Configuration conf) throws IOException { >>>> > > > - return CompositePathIterable.create(FileSystem.get(conf), path, >>>> > > > - new TextFileReaderFactory<T>(ptype, conf)); >>>> > > > + return CompositePathIterable.create(FileSystem.get(path.toUri(), >>>> > > > conf), path, >>>> > > > + new TextFileReaderFactory<T>(ptype, conf)); >>>> > > > } >>>> > > > } >>>> > > > diff --git >>>> > > > src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java >>>> > > > src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java >>>> > > > new file mode 100644 >>>> > > > index 0000000..8072e07 >>>> > > > --- /dev/null >>>> > > > +++ src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java >>>> > > > @@ -0,0 +1,143 @@ >>>> > > > +package com.cloudera.crunch.lib.join; >>>> > > > + >>>> > > > +import java.io.IOException; >>>> > > > + >>>> > > > +import org.apache.hadoop.filecache.DistributedCache; >>>> > > > +import org.apache.hadoop.fs.FileSystem; >>>> > > > +import org.apache.hadoop.fs.Path; >>>> > > > + >>>> > > > +import com.cloudera.crunch.DoFn; >>>> > > > +import com.cloudera.crunch.Emitter; >>>> > > > +import com.cloudera.crunch.PTable; >>>> > > > +import com.cloudera.crunch.Pair; >>>> > > > +import com.cloudera.crunch.impl.mr.MRPipeline; >>>> > > > +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; >>>> > > > +import com.cloudera.crunch.io.ReadableSourceTarget; >>>> > > > +import com.cloudera.crunch.io.impl.SourcePathTargetImpl; >>>> > > > +import com.cloudera.crunch.types.PType; >>>> > > > +import com.cloudera.crunch.types.PTypeFamily; >>>> > > > +import com.google.common.collect.ArrayListMultimap; >>>> > > > +import com.google.common.collect.Multimap; >>>> > > > + >>>> > > > +/** >>>> > > > + * Utility for doing map side joins on a common key between two >>>> > > > {@link PTable}s. >>>> > > > + * <p> >>>> > > > + * A map side join is an optimized join which doesn't use a reducer; >>>> > > >>>> > >>>> > >>>> > instead, >>>> > > > + * the right side of the join is loaded into memory and the join is >>>> > > > performed in >>>> > > > + * a mapper. This style of join has the important implication that >>>> > > > the output of >>>> > > > + * the join is not sorted, which is the case with a conventional >>>> > > > (reducer-based) >>>> > > > + * join. >>>> > > > + * <p> >>>> > > > + * <b>Note:</b>This utility is only supported when running with a >>>> > > > + * {@link MRPipeline} as the pipeline. >>>> > > > + */ >>>> > > > +public class MapsideJoin { >>>> > > > + >>>> > > > + /** >>>> > > > + * Join two tables using a map side join. The right-side table will >>>> > > >>>> > >>>> > >>>> > be loaded >>>> > > > + * fully in memory, so this method should only be used if the right >>>> > > >>>> > >>>> > >>>> > side >>>> > > > + * table's contents can fit in the memory allocated to mappers. The >>>> > > >>>> > >>>> > >>>> > join >>>> > > > + * performed by this method is an inner join. >>>> > > > + * >>>> > > > + * @param left >>>> > > > + * The left-side table of the join >>>> > > > + * @param right >>>> > > > + * The right-side table of the join, whose contents will be fully >>>> > > > + * read into memory >>>> > > > + * @return A table keyed on the join key, containing pairs of joined >>>> > > >>>> > >>>> > >>>> > values >>>> > > > + */ >>>> > > > + public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> >>>> > > > left, PTable<K, V> right) { >>>> > > > + >>>> > > > + if (!(right.getPipeline() instanceof MRPipeline)) { >>>> > > > + throw new CrunchRuntimeException("Map-side join is only >>>> > > > supported within a MapReduce context"); >>>> > > > + } >>>> > > > + >>>> > > > + MRPipeline pipeline = (MRPipeline) right.getPipeline(); >>>> > > > + pipeline.materialize(right); >>>> > > > + >>>> > > > + // TODO Move necessary logic to MRPipeline so that we can >>>> > > >>>> > >>>> > >>>> > theoretically >>>> > > > + // optimize his by running the setup of multiple map-side joins >>>> > > > concurrently >>>> > > > + pipeline.run(); >>>> > > > + >>>> > > > + ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline >>>> > > > + .getMaterializeSourceTarget(right); >>>> > > > + if (!(readableSourceTarget instanceof SourcePathTargetImpl)) { >>>> > > > + throw new CrunchRuntimeException("Right-side contents can't be >>>> > > > read from a path"); >>>> > > > + } >>>> > > > + >>>> > > > + // Suppress warnings because we've just checked this cast via >>>> > > >>>> > >>>> > >>>> > instanceof >>>> > > > + @SuppressWarnings("unchecked") >>>> > > > + SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = >>>> > > > (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget; >>>> > > > + >>>> > > > + Path path = sourcePathTarget.getPath(); >>>> > > > + DistributedCache.addCacheFile(path.toUri(), >>>> > > >>>> > >>>> > >>>> > pipeline.getConfiguration()); >>>> > > > + >>>> > > > + MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, >>>> > > > V>(path.toString(), >>>> > > > + right.getPType()); >>>> > > > + PTypeFamily typeFamily = left.getTypeFamily(); >>>> > > > + return left.parallelDo( >>>> > > > + "mapjoin", >>>> > > > + mapJoinDoFn, >>>> > > > + typeFamily.tableOf(left.getKeyType(), >>>> > > > + typeFamily.pairs(left.getValueType(), right.getValueType()))); >>>> > > > + >>>> > > > + } >>>> > > > + >>>> > > > + static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, >>>> > > > Pair<K, Pair<U, V>>> { >>>> > > > + >>>> > > > + private String inputPath; >>>> > > > + private PType<Pair<K, V>> ptype; >>>> > > > + private Multimap<K, V> joinMap; >>>> > > > + >>>> > > > + public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) { >>>> > > > + this.inputPath = inputPath; >>>> > > > + this.ptype = ptype; >>>> > > > + } >>>> > > > + >>>> > > > + private Path getCacheFilePath() { >>>> > > > + try { >>>> > > > + for (Path localPath : >>>> > > > DistributedCache.getLocalCacheFiles(getConfiguration())) { >>>> > > > + if (localPath.toString().endsWith(inputPath)) { >>>> > > > + return >>>> > > > localPath.makeQualified(FileSystem.getLocal(getConfiguration())); >>>> > > > + >>>> > > > + } >>>> > > > + } >>>> > > > + } catch (IOException e) { >>>> > > > + throw new CrunchRuntimeException(e); >>>> > > > + } >>>> > > > + >>>> > > > + throw new CrunchRuntimeException("Can't find local cache file >>>> > > > for '" + inputPath + "'"); >>>> > > > + } >>>> > > > + >>>> > > > + @Override >>>> > > > + public void initialize() { >>>> > > > + super.initialize(); >>>> > > > + >>>> > > > + ReadableSourceTarget<Pair<K, V>> sourceTarget = >>>> > > > (ReadableSourceTarget<Pair<K, V>>) ptype >>>> > > > + .getDefaultFileSource(getCacheFilePath()); >>>> > > > + Iterable<Pair<K, V>> iterable = null; >>>> > > > + try { >>>> > > > + iterable = sourceTarget.read(getConfiguration()); >>>> > > > + } catch (IOException e) { >>>> > > > + throw new CrunchRuntimeException("Error reading right-side of >>>> > > > map side join: ", e); >>>> > > > + } >>>> > > > + >>>> > > > + joinMap = ArrayListMultimap.create(); >>>> > > > + for (Pair<K, V> joinPair : iterable) { >>>> > > > + joinMap.put(joinPair.first(), joinPair.second()); >>>> > > > + } >>>> > > > + } >>>> > > > + >>>> > > > + @Override >>>> > > > + public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, >>>> > > > V>>> emitter) { >>>> > > > + K key = input.first(); >>>> > > > + U value = input.second(); >>>> > > > + for (V joinValue : joinMap.get(key)) { >>>> > > > + Pair<U, V> valuePair = Pair.of(value, joinValue); >>>> > > > + emitter.emit(Pair.of(key, valuePair)); >>>> > > > + } >>>> > > > + } >>>> > > > + >>>> > > > + } >>>> > > > + >>>> > > > +} >>>> > > > diff --git src/main/java/com/cloudera/crunch/types/PType.java >>>> > > > src/main/java/com/cloudera/crunch/types/PType.java >>>> > > > index af4ef1b..ae480aa 100644 >>>> > > > --- src/main/java/com/cloudera/crunch/types/PType.java >>>> > > > +++ src/main/java/com/cloudera/crunch/types/PType.java >>>> > > > @@ -15,6 +15,7 @@ >>>> > > > >>>> > > > package com.cloudera.crunch.types; >>>> > > > >>>> > > > +import java.io.Serializable; >>>> > > > import java.util.List; >>>> > > > >>>> > > > import org.apache.hadoop.fs.Path; >>>> > > > @@ -31,7 +32,7 @@ import com.cloudera.crunch.SourceTarget; >>>> > > > * {@code PCollection}. >>>> > > > * >>>> > > > */ >>>> > > > -public interface PType<T> { >>>> > > > +public interface PType<T> extends Serializable { >>>> > > > /** >>>> > > > * Returns the Java type represented by this {@code PType}. >>>> > > > */ >>>> > > > diff --git src/main/java/com/cloudera/crunch/types/avro/AvroType.java >>>> > > > src/main/java/com/cloudera/crunch/types/avro/AvroType.java >>>> > > > index 3db00c0..29af9fb 100644 >>>> > > > --- src/main/java/com/cloudera/crunch/types/avro/AvroType.java >>>> > > > +++ src/main/java/com/cloudera/crunch/types/avro/AvroType.java >>>> > > > @@ -14,6 +14,7 @@ >>>> > > > */ >>>> > > > package com.cloudera.crunch.types.avro; >>>> > > > >>>> > > > +import java.io.Serializable; >>>> > > > import java.util.List; >>>> > > > >>>> > > > import org.apache.avro.Schema; >>>> > > > @@ -41,7 +42,8 @@ public class AvroType<T> implements PType<T> { >>>> > > > private static final Converter AVRO_CONVERTER = new >>>> > > > AvroKeyConverter(); >>>> > > > >>>> > > > private final Class<T> typeClass; >>>> > > > - private final Schema schema; >>>> > > > + private final String schemaString; >>>> > > > + private transient Schema schema; >>>> > > > private final MapFn baseInputMapFn; >>>> > > > private final MapFn baseOutputMapFn; >>>> > > > private final List<PType> subTypes; >>>> > > > @@ -55,6 +57,7 @@ public class AvroType<T> implements PType<T> { >>>> > > > MapFn outputMapFn, PType... ptypes) { >>>> > > > this.typeClass = typeClass; >>>> > > > this.schema = Preconditions.checkNotNull(schema); >>>> > > > + this.schemaString = schema.toString(); >>>> > > > this.baseInputMapFn = inputMapFn; >>>> > > > this.baseOutputMapFn = outputMapFn; >>>> > > > this.subTypes = ImmutableList.<PType> builder().add(ptypes).build(); >>>> > > > @@ -76,6 +79,9 @@ public class AvroType<T> implements PType<T> { >>>> > > > } >>>> > > > >>>> > > > public Schema getSchema() { >>>> > > > + if (schema == null){ >>>> > > > + schema = new Schema.Parser().parse(schemaString); >>>> > > > + } >>>> > > > return schema; >>>> > > > } >>>> > > > >>>> > > > diff --git >>>> > src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java >>>> > > > src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java >>>> > > > new file mode 100644 >>>> > > > index 0000000..f265460 >>>> > > > --- /dev/null >>>> > > > +++ src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java >>>> > > > @@ -0,0 +1,60 @@ >>>> > > > +package com.cloudera.crunch.impl.mr; >>>> > > > + >>>> > > > +import static org.junit.Assert.assertEquals; >>>> > > > +import static org.mockito.Mockito.doReturn; >>>> > > > +import static org.mockito.Mockito.mock; >>>> > > > +import static org.mockito.Mockito.spy; >>>> > > > +import static org.mockito.Mockito.when; >>>> > > > + >>>> > > > +import java.io.IOException; >>>> > > > + >>>> > > > +import org.junit.Before; >>>> > > > +import org.junit.Test; >>>> > > > + >>>> > > > +import com.cloudera.crunch.SourceTarget; >>>> > > > +import com.cloudera.crunch.impl.mr.collect.PCollectionImpl; >>>> > > > +import com.cloudera.crunch.io.ReadableSourceTarget; >>>> > > > +import com.cloudera.crunch.types.avro.Avros; >>>> > > > + >>>> > > > +public class MRPipelineTest { >>>> > > > + >>>> > > > + private MRPipeline pipeline; >>>> > > > + >>>> > > > + @Before >>>> > > > + public void setUp() throws IOException { >>>> > > > + pipeline = spy(new MRPipeline(MRPipelineTest.class)); >>>> > > > + } >>>> > > > + >>>> > > > + @Test >>>> > > > + public void testGetMaterializeSourceTarget_AlreadyMaterialized() { >>>> > > > + PCollectionImpl<String> materializedPcollection = >>>> > > > mock(PCollectionImpl.class); >>>> > > > + ReadableSourceTarget<String> readableSourceTarget = >>>> > > > mock(ReadableSourceTarget.class); >>>> > > > + >>>> > > >>>> > >>>> > >>>> > when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget); >>>> > > > + >>>> > > > + assertEquals(readableSourceTarget, >>>> > > > pipeline.getMaterializeSourceTarget(materializedPcollection)); >>>> > > > + } >>>> > > > + >>>> > > > + @Test >>>> > > > + public void >>>> > > >>>> > >>>> > >>>> > testGetMaterializeSourceTarget_NotMaterialized_HasOutput() { >>>> > > > + >>>> > > > + PCollectionImpl<String> pcollection = mock(PCollectionImpl.class); >>>> > > > + ReadableSourceTarget<String> readableSourceTarget = >>>> > > > mock(ReadableSourceTarget.class); >>>> > > > + when(pcollection.getPType()).thenReturn(Avros.strings()); >>>> > > > + >>>> > > >>>> > >>>> > >>>> > doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); >>>> > > > + when(pcollection.getMaterializedAt()).thenReturn(null); >>>> > > > + >>>> > > > + assertEquals(readableSourceTarget, >>>> > > > pipeline.getMaterializeSourceTarget(pcollection)); >>>> > > > + } >>>> > > > + >>>> > > > + @Test(expected = IllegalArgumentException.class) >>>> > > > + public void >>>> > > >>>> > >>>> > >>>> > testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() >>>> > > > { >>>> > > > + PCollectionImpl<String> pcollection = mock(PCollectionImpl.class); >>>> > > > + SourceTarget<String> nonReadableSourceTarget = >>>> > > >>>> > >>>> > >>>> > mock(SourceTarget.class); >>>> > > > + when(pcollection.getPType()).thenReturn(Avros.strings()); >>>> > > > + >>>> > > >>>> > >>>> > >>>> > doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); >>>> > > > + when(pcollection.getMaterializedAt()).thenReturn(null); >>>> > > > + >>>> > > > + pipeline.getMaterializeSourceTarget(pcollection); >>>> > > > + } >>>> > > > + >>>> > > > +} >>>> > > > diff --git >>>> > > >>>> > >>>> > >>>> > src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java >>>> > > > src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java >>>> > > > new file mode 100644 >>>> > > > index 0000000..97e0c63 >>>> > > > --- /dev/null >>>> > > > +++ src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java >>>> > > > @@ -0,0 +1,102 @@ >>>> > > > +package com.cloudera.crunch.lib.join; >>>> > > > + >>>> > > > +import static org.junit.Assert.assertEquals; >>>> > > > +import static org.junit.Assert.assertTrue; >>>> > > > + >>>> > > > +import java.io.IOException; >>>> > > > +import java.util.Collections; >>>> > > > +import java.util.List; >>>> > > > + >>>> > > > +import org.junit.Test; >>>> > > > + >>>> > > > +import com.cloudera.crunch.FilterFn; >>>> > > > +import com.cloudera.crunch.MapFn; >>>> > > > +import com.cloudera.crunch.PTable; >>>> > > > +import com.cloudera.crunch.Pair; >>>> > > > +import com.cloudera.crunch.Pipeline; >>>> > > > +import com.cloudera.crunch.impl.mem.MemPipeline; >>>> > > > +import com.cloudera.crunch.impl.mr.MRPipeline; >>>> > > > +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; >>>> > > > +import com.cloudera.crunch.test.FileHelper; >>>> > > > +import com.cloudera.crunch.types.writable.Writables; >>>> > > > +import com.google.common.collect.Lists; >>>> > > > + >>>> > > > +public class MapsideJoinTest { >>>> > > > + >>>> > > > + private static class LineSplitter extends MapFn<String, >>>> > > > Pair<Integer, String>> { >>>> > > > + >>>> > > > + @Override >>>> > > > + public Pair<Integer, String> map(String input) { >>>> > > > + String[] fields = input.split("\\|"); >>>> > > > + return Pair.of(Integer.parseInt(fields[0]), fields[1]); >>>> > > > + } >>>> > > > + >>>> > > > + } >>>> > > > + >>>> > > > + private static class NegativeFilter extends FilterFn<Pair<Integer, >>>> > > >>>> > >>>> > >>>> > String>> { >>>> > > > + >>>> > > > + @Override >>>> > > > + public boolean accept(Pair<Integer, String> input) { >>>> > > > + return false; >>>> > > > + } >>>> > > > + >>>> > > > + } >>>> > > > + >>>> > > > + @Test(expected = CrunchRuntimeException.class) >>>> > > > + public void testNonMapReducePipeline() { >>>> > > > + runMapsideJoin(MemPipeline.getInstance()); >>>> > > > + } >>>> > > > + >>>> > > > + @Test >>>> > > > + public void testMapsideJoin_RightSideIsEmpty() throws IOException { >>>> > > > + MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class); >>>> > > > + PTable<Integer, String> customerTable = readTable(pipeline, >>>> > > > "customers.txt"); >>>> > > > + PTable<Integer, String> orderTable = readTable(pipeline, >>>> > > >>>> > >>>> > >>>> > "orders.txt"); >>>> > > > + >>>> > > > + PTable<Integer, String> filteredOrderTable = >>>> > > > orderTable.parallelDo(new NegativeFilter(), >>>> > > > + orderTable.getPTableType()); >>>> > > > + >>>> > > > + PTable<Integer, Pair<String, String>> joined = >>>> > > > MapsideJoin.join(customerTable, >>>> > > > + filteredOrderTable); >>>> > > > + >>>> > > > + List<Pair<Integer, Pair<String, String>>> materializedJoin = >>>> > > > Lists.newArrayList(joined >>>> > > > + .materialize()); >>>> > > > + >>>> > > > + assertTrue(materializedJoin.isEmpty()); >>>> > > > + >>>> > > > + } >>>> > > > + >>>> > > > + @Test >>>> > > > + public void testMapsideJoin() throws IOException { >>>> > > > + runMapsideJoin(new MRPipeline(MapsideJoinTest.class)); >>>> > > > + } >>>> > > > + >>>> > > > + private void runMapsideJoin(Pipeline pipeline) { >>>> > > > + PTable<Integer, String> customerTable = readTable(pipeline, >>>> > > > "customers.txt"); >>>> > > > + PTable<Integer, String> orderTable = readTable(pipeline, >>>> > > >>>> > >>>> > >>>> > "orders.txt"); >>>> > > > + >>>> > > > + PTable<Integer, Pair<String, String>> joined = >>>> > > > MapsideJoin.join(customerTable, orderTable); >>>> > > > + >>>> > > > + List<Pair<Integer, Pair<String, String>>> expectedJoinResult = >>>> > > > Lists.newArrayList(); >>>> > > > + expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn >>>> > > >>>> > >>>> > >>>> > flakes"))); >>>> > > > + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet >>>> > > >>>> > >>>> > >>>> > paper"))); >>>> > > > + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet >>>> > > > plunger"))); >>>> > > > + expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", >>>> > > > "Toilet brush"))); >>>> > > > + >>>> > > > + List<Pair<Integer, Pair<String, String>>> joinedResultList = >>>> > > > Lists.newArrayList(joined >>>> > > > + .materialize()); >>>> > > > + Collections.sort(joinedResultList); >>>> > > > + >>>> > > > + assertEquals(expectedJoinResult, joinedResultList); >>>> > > > + } >>>> > > > + >>>> > > > + private static PTable<Integer, String> readTable(Pipeline pipeline, >>>> > > > String filename) { >>>> > > > + try { >>>> > > > + return >>>> > > >>>> > >>>> > >>>> > pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable", >>>> > > > + new LineSplitter(), Writables.tableOf(Writables.ints(), >>>> > > > Writables.strings())); >>>> > > > + } catch (IOException e) { >>>> > > > + throw new RuntimeException(e); >>>> > > > + } >>>> > > > + } >>>> > > > + >>>> > > > +} >>>> > > > diff --git >>>> > > > src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >>>> > > > src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >>>> > > > index 74e2ad3..c6a0b46 100644 >>>> > > > --- src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >>>> > > > +++ src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >>>> > > > @@ -25,6 +25,7 @@ import org.apache.avro.Schema.Type; >>>> > > > import org.apache.avro.generic.GenericData; >>>> > > > import org.apache.avro.util.Utf8; >>>> > > > import org.apache.hadoop.io.LongWritable; >>>> > > > +import org.junit.Ignore; >>>> > > > import org.junit.Test; >>>> > > > >>>> > > > import com.cloudera.crunch.Pair; >>>> > > > @@ -103,6 +104,7 @@ public class AvrosTest { >>>> > > > } >>>> > > > >>>> > > > @Test >>>> > > > + @Ignore("This test creates an invalid schema that causes >>>> > > > Schema#toString to fail") >>>> > > > public void testNestedTables() throws Exception { >>>> > > > PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), >>>> > > >>>> > >>>> > >>>> > Avros.longs()); >>>> > > > PTableType<Pair<Long, Long>, String> nest = Avros.tableOf(pll, >>>> > > > Avros.strings()); >>>> > > > diff --git src/test/resources/customers.txt >>>> > > >>>> > >>>> > >>>> > src/test/resources/customers.txt >>>> > > > new file mode 100644 >>>> > > > index 0000000..98f3f3d >>>> > > > --- /dev/null >>>> > > > +++ src/test/resources/customers.txt >>>> > > > @@ -0,0 +1,4 @@ >>>> > > > +111|John Doe >>>> > > > +222|Jane Doe >>>> > > > +333|Someone Else >>>> > > > +444|Has No Orders >>>> > > > \ No newline at end of file >>>> > > > diff --git src/test/resources/orders.txt >>>> > > > src/test/resources/orders.txt >>>> > > > new file mode 100644 >>>> > > > index 0000000..2f1383f >>>> > > > --- /dev/null >>>> > > > +++ src/test/resources/orders.txt >>>> > > > @@ -0,0 +1,4 @@ >>>> > > > +222|Toilet plunger >>>> > > > +333|Toilet brush >>>> > > > +222|Toilet paper >>>> > > > +111|Corn flakes >>>> > > > \ No newline at end of file >>>> > > > >>>> > > > >>>> > > > >>>> > > > On Tue, Jul 3, 2012 at 10:42 AM, Christian Tzolov >>>> > > > <[email protected] (mailto:[email protected])> >>>> > > >>>> > >>>> > >>>> > wrote: >>>> > > > > Hi Gabriel, >>>> > > > > >>>> > > > > Seems like the attachment is missing. >>>> > > > > >>>> > > > > Cheers, >>>> > > > > Chris >>>> > > > > >>>> > > > > On Tue, Jul 3, 2012 at 9:23 AM, Gabriel Reid >>>> > > > > <[email protected] (mailto:[email protected])(mailto: >>>> > [email protected] (mailto:[email protected]))> wrote: >>>> > > > > >>>> > > > > > Hi guys, >>>> > > > > > >>>> > > > > > Attached (hopefully) is a patch for an initial implementation of >>>> > map >>>> > > > > > side joins. It's currently implemented as a static method in a >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > class >>>> > > > > > called MapsideJoin, with the same interface as the existing Join >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > class >>>> > > > > > (with only inner joins being implemented for now). The way it >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > works is >>>> > > > > > that the right-side PTable of the join is put in the distributed >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > cache >>>> > > > > > and then read by the join function at runtime. >>>> > > > > > >>>> > > > > > There's one spot that I can see for a potentially interesting >>>> > > > > > optimization -- MRPipeline#run is called once for each map side >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > join >>>> > > > > > that is set up, but if the setup of the joins was done within >>>> > > > > > MRPipeline, then we could set up multiple map side joins in >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > parallel >>>> > > > > > with a single call to MRPipeline#run. OTOH, a whole bunch of map >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > side >>>> > > > > > joins in parallel probably isn't that common of an operation. >>>> > > > > > >>>> > > > > > If anyone feels like taking a look at the patch, any feedback is >>>> > > > > > appreciated. If nobody sees something that needs serious changes >>>> > > > > > in >>>> > > > > > the patch, I'll commit it. >>>> > > > > > >>>> > > > > > - Gabriel >>>> > > > > > >>>> > > > > > >>>> > > > > > On Thu, Jun 21, 2012 at 9:09 AM, Gabriel Reid < >>>> > [email protected] (mailto:[email protected])> >>>> > > > > > wrote: >>>> > > > > > > Replying to all... >>>> > > > > > > >>>> > > > > > > On Thu, Jun 21, 2012 at 8:40 AM, Josh Wills >>>> > > > > > > <[email protected] (mailto:[email protected])(mailto: >>>> > [email protected] (mailto:[email protected]))> wrote: >>>> > > > > > > > >>>> > > > > > > > So there's a philosophical issue here: should Crunch ever >>>> > > > > > > > make >>>> > > > > > > > decisions about how to do something itself based on its >>>> > > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > estimates of >>>> > > > > > > > the size of the data sets, or should it always do exactly >>>> > > > > > > > what >>>> > > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > the >>>> > > > > > > > developer indicates? >>>> > > > > > > > >>>> > > > > > > > I can make a case either way, but I think that no matter >>>> > > > > > > > what, >>>> > we >>>> > > > > > > > would want to have explicit functions for performing a join >>>> > > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > that reads >>>> > > > > > > > one data set into memory, so I think we can proceed w/the >>>> > > > > > > > implementation while folks weigh in on what their preferences >>>> > > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > are for >>>> > > > > > > > the default join() behavior (e.g., just do a reduce-side >>>> > > > > > > > join, >>>> > > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > or try >>>> > > > > > > > to figure out the best join given information about the input >>>> > > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > data and >>>> > > > > > > > some configuration parameters.) >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > I definitely agree on needing to have an explicit way to invoke >>>> > one or >>>> > > > > > > the other -- and in general I don't like having magic behind >>>> > > > > > > the >>>> > > > > > > scenes to decide on behaviour (especially considering Crunch is >>>> > > > > > > generally intended to be closer to the metal than Pig and >>>> > > > > > > Hive). >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > I'm >>>> > > > > > > not sure if the runtime decision is something specific to some >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > of my >>>> > > > > > > use cases or if it could be useful to a wider audience. >>>> > > > > > > >>>> > > > > > > The ability to dynamically decide at runtime whether a map side >>>> > join >>>> > > > > > > should be used can also easily be tacked on outside of Crunch, >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > and >>>> > > > > > > won't impact the underlying implementation (as you pointed >>>> > > > > > > out), >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > so I >>>> > > > > > > definitely also agree on focusing on the underlying >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > implementation >>>> > > > > > > first, and we can worry about the options used for exposing it >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> > >>>> > later >>>> > > > > > > on. >>>> > > > > > > >>>> > > > > > > - Gabriel >>>> >>>> >>>> -- >>>> Director of Data Science >>>> Cloudera <http://www.cloudera.com> >>>> Twitter: @josh_wills <http://twitter.com/josh_wills> >>> >>> >>>
