It definitely has a significant performance gain when dealing with big records 
that result in a slow shuffle. 

By the way, I discovered (confirmed) an issue with the normal join 
implementation, where objects are held on to without using a deep copy. I was 
thinking of fixing that this evening -- are you planning on doing the big 
package rename today?

On 06 Jul 2012, at 18:57, Josh Wills <[email protected]> wrote:

> Not a problem man-- can't wait to play with it this weekend!
> 
> On Fri, Jul 6, 2012 at 9:50 AM, Gabriel Reid <[email protected]> wrote:
>> Hi Josh,
>> 
>> I've just committed the map side joins, after doing some more
>> extensive testing on it today. Unfortunately, I forgot to squash the
>> multiple commits into a single one (as I was planning on doing), but
>> it's all in there and working.
>> 
>> - Gabriel
>> 
>> On Fri, Jul 6, 2012 at 7:28 AM, Gabriel Reid <[email protected]> wrote:
>>> Thanks for making the JIRA issue.
>>> 
>>> I was going to do some more testing an a "real" cluster later on today, and 
>>> as long as that all checks out then this will be ready to go in as far as 
>>> I'm concerned.
>>> 
>>> I don't see any reason to hold back on the big renaming for this patch -- 
>>> if you're ready to do the package renaming, please go ahead.
>>> 
>>> As long as my testing later today checks out and there aren't any glaring 
>>> issues from anyone who has tried the patch out, I'll probably be checking 
>>> the patch in later today (so it'll probably be in before you do the 
>>> renaming anyhow).
>>> 
>>> - Gabriel
>>> 
>>> 
>>> On Friday 6 July 2012 at 00:11, Josh Wills wrote:
>>> 
>>>> FYI: https://issues.apache.org/jira/browse/CRUNCH-3 to track this.
>>>> 
>>>> Gabriel, is your feeling that this is ready to go in? I'm debating whether
>>>> or not to check this in before/after I do the massive com.cloudera ->
>>>> org.apache renaming. What do you think?
>>>> 
>>>> On Tue, Jul 3, 2012 at 1:25 PM, Gabriel Reid <[email protected] 
>>>> (mailto:[email protected])> wrote:
>>>> 
>>>>> Hi Joe,
>>>>> 
>>>>> Looking forward to hearing your feedback! This is still just a patch and
>>>>> not committed yet, and there's still definitely room for help (i.e. ideas
>>>>> on how to improve it, or do it totally differently), so certainly let me
>>>>> know if you've got some ideas.
>>>>> 
>>>>> - Gabriel
>>>>> 
>>>>> 
>>>>> On Tuesday 3 July 2012 at 18:01, Joseph Adler wrote:
>>>>> 
>>>>>> Awesome! Will give this a try soon... wish I could have helped with this
>>>>> one...
>>>>>> 
>>>>>> On Tue, Jul 3, 2012 at 1:53 AM, Gabriel Reid <[email protected] 
>>>>>> (mailto:[email protected])(mailto:
>>>>> [email protected] (mailto:[email protected]))> wrote:
>>>>>>> Thanks for pointing that out Chris. I'm guessing the mailing list is
>>>>>>> stripping out attachments(?). Once the JIRA is up and running then
>>>>>>> that will be taken care of I guess.
>>>>>>> 
>>>>>>> The mime type on the last attempt was application/octet-stream, so
>>>>>>> I've renamed this to a .txt file to try to ensure that it'll get a
>>>>>>> text/plain mime type (although I don't know if that'll make a
>>>>>>> difference). I've also pasted it inline below, hopefully one of those
>>>>>>> solutions works.
>>>>>>> 
>>>>>>> - Gabriel
>>>>>>> 
>>>>>>> 
>>>>>>> diff --git src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
>>>>>>> src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
>>>>>>> index 420e8dc..c8ba596 100644
>>>>>>> --- src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
>>>>>>> +++ src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
>>>>>>> @@ -59,9 +59,9 @@ import com.google.common.collect.Sets;
>>>>>>> public class MRPipeline implements Pipeline {
>>>>>>> 
>>>>>>> private static final Log LOG = LogFactory.getLog(MRPipeline.class);
>>>>>>> -
>>>>>>> +
>>>>>>> private static final Random RANDOM = new Random();
>>>>>>> -
>>>>>>> +
>>>>>>> private final Class<?> jarClass;
>>>>>>> private final String name;
>>>>>>> private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
>>>>>>> @@ -75,15 +75,15 @@ public class MRPipeline implements Pipeline {
>>>>>>> public MRPipeline(Class<?> jarClass) throws IOException {
>>>>>>> this(jarClass, new Configuration());
>>>>>>> }
>>>>>>> -
>>>>>>> - public MRPipeline(Class<?> jarClass, String name){
>>>>>>> +
>>>>>>> + public MRPipeline(Class<?> jarClass, String name) {
>>>>>>> this(jarClass, name, new Configuration());
>>>>>>> }
>>>>>>> -
>>>>>>> +
>>>>>>> public MRPipeline(Class<?> jarClass, Configuration conf) {
>>>>>>> - this(jarClass, jarClass.getName(), conf);
>>>>>>> + this(jarClass, jarClass.getName(), conf);
>>>>>>> }
>>>>>>> -
>>>>>>> +
>>>>>>> public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
>>>>>>> this.jarClass = jarClass;
>>>>>>> this.name (http://this.name) = name;
>>>>>>> @@ -102,9 +102,9 @@ public class MRPipeline implements Pipeline {
>>>>>>> 
>>>>>>> @Override
>>>>>>> public void setConfiguration(Configuration conf) {
>>>>>>> - this.conf = conf;
>>>>>>> + this.conf = conf;
>>>>>>> }
>>>>>>> -
>>>>>>> +
>>>>>>> @Override
>>>>>>> public PipelineResult run() {
>>>>>>> MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
>>>>>>> @@ -125,8 +125,8 @@ public class MRPipeline implements Pipeline {
>>>>>>> boolean materialized = false;
>>>>>>> for (Target t : outputTargets.get(c)) {
>>>>>>> if (!materialized && t instanceof Source) {
>>>>>>> - c.materializeAt((SourceTarget) t);
>>>>>>> - materialized = true;
>>>>>>> + c.materializeAt((SourceTarget) t);
>>>>>>> + materialized = true;
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>> @@ -144,7 +144,7 @@ public class MRPipeline implements Pipeline {
>>>>>>> cleanup();
>>>>>>> return res;
>>>>>>> }
>>>>>>> -
>>>>>>> +
>>>>>>> public <S> PCollection<S> read(Source<S> source) {
>>>>>>> return new InputCollection<S>(source, this);
>>>>>>> }
>>>>>>> @@ -160,85 +160,120 @@ public class MRPipeline implements Pipeline {
>>>>>>> @SuppressWarnings("unchecked")
>>>>>>> public void write(PCollection<?> pcollection, Target target) {
>>>>>>> if (pcollection instanceof PGroupedTableImpl) {
>>>>>>> - pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup();
>>>>>>> + pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
>>>>>>> } else if (pcollection instanceof UnionCollection || pcollection
>>>>>>> instanceof UnionTable) {
>>>>>>> - pcollection = pcollection.parallelDo("UnionCollectionWrapper",
>>>>>>> - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());
>>>>>>> + pcollection = pcollection.parallelDo("UnionCollectionWrapper",
>>>>>>> + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
>>>>>>> }
>>>>>>> addOutput((PCollectionImpl<?>) pcollection, target);
>>>>>>> }
>>>>>>> 
>>>>>>> private void addOutput(PCollectionImpl<?> impl, Target target) {
>>>>>>> if (!outputTargets.containsKey(impl)) {
>>>>>>> - outputTargets.put(impl, Sets.<Target>newHashSet());
>>>>>>> + outputTargets.put(impl, Sets.<Target> newHashSet());
>>>>>>> }
>>>>>>> outputTargets.get(impl).add(target);
>>>>>>> }
>>>>>>> -
>>>>>>> +
>>>>>>> @Override
>>>>>>> public <T> Iterable<T> materialize(PCollection<T> pcollection) {
>>>>>>> -
>>>>>>> - if (pcollection instanceof UnionCollection) {
>>>>>>> - pcollection = pcollection.parallelDo("UnionCollectionWrapper",
>>>>>>> - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());
>>>>>>> - }
>>>>>>> - PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection;
>>>>>>> +
>>>>>>> + PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
>>>>>>> + ReadableSourceTarget<T> srcTarget =
>>>>>>> getMaterializeSourceTarget(pcollectionImpl);
>>>>>>> +
>>>>>>> + MaterializableIterable<T> c = new MaterializableIterable<T>(this,
>>>>>>> srcTarget);
>>>>>>> + if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
>>>>>>> + outputTargetsToMaterialize.put(pcollectionImpl, c);
>>>>>>> + }
>>>>>>> + return c;
>>>>>>> + }
>>>>>>> +
>>>>>>> + /**
>>>>>>> + * Retrieve a ReadableSourceTarget that provides access to the
>>>>>> 
>>>>> 
>>>>> 
>>>>> contents of a
>>>>>>> + * {@link PCollection}. This is primarily intended as a helper method
>>>>>> 
>>>>> 
>>>>> 
>>>>> to
>>>>>>> + * {@link #materialize(PCollection)}. The underlying data of the
>>>>>>> + * ReadableSourceTarget may not be actually present until the
>>>>>>> pipeline is run.
>>>>>>> + *
>>>>>>> + * @param pcollection
>>>>>>> + * The collection for which the ReadableSourceTarget is to be
>>>>>>> + * retrieved
>>>>>>> + * @return The ReadableSourceTarget
>>>>>>> + * @throws IllegalArgumentException
>>>>>>> + * If no ReadableSourceTarget can be retrieved for the given
>>>>>>> + * PCollection
>>>>>>> + */
>>>>>>> + public <T> ReadableSourceTarget<T>
>>>>>>> getMaterializeSourceTarget(PCollection<T> pcollection) {
>>>>>>> + PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
>>>>>>> SourceTarget<T> matTarget = impl.getMaterializedAt();
>>>>>>> if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
>>>>>>> - return new MaterializableIterable<T>(this,
>>>>>>> (ReadableSourceTarget<T>) matTarget);
>>>>>>> + return (ReadableSourceTarget<T>) matTarget;
>>>>>>> + }
>>>>>>> +
>>>>>>> + ReadableSourceTarget<T> srcTarget = null;
>>>>>>> + if (outputTargets.containsKey(pcollection)) {
>>>>>>> + for (Target target : outputTargets.get(impl)) {
>>>>>>> + if (target instanceof ReadableSourceTarget) {
>>>>>>> + srcTarget = (ReadableSourceTarget<T>) target;
>>>>>>> + break;
>>>>>>> + }
>>>>>>> + }
>>>>>>> }
>>>>>>> -
>>>>>>> - ReadableSourceTarget<T> srcTarget = null;
>>>>>>> - if (outputTargets.containsKey(pcollection)) {
>>>>>>> - for (Target target : outputTargets.get(impl)) {
>>>>>>> - if (target instanceof ReadableSourceTarget) {
>>>>>>> - srcTarget = (ReadableSourceTarget) target;
>>>>>>> - break;
>>>>>>> - }
>>>>>>> - }
>>>>>>> - }
>>>>>>> -
>>>>>>> - if (srcTarget == null) {
>>>>>>> - SourceTarget<T> st =
>>>>>> 
>>>>> 
>>>>> 
>>>>> createIntermediateOutput(pcollection.getPType());
>>>>>>> - if (!(st instanceof ReadableSourceTarget)) {
>>>>>>> - throw new IllegalArgumentException("The PType for the given
>>>>>>> PCollection is not readable"
>>>>>>> - + " and cannot be materialized");
>>>>>>> - } else {
>>>>>>> - srcTarget = (ReadableSourceTarget) st;
>>>>>>> - addOutput(impl, srcTarget);
>>>>>>> - }
>>>>>>> - }
>>>>>>> -
>>>>>>> - MaterializableIterable<T> c = new MaterializableIterable<T>(this,
>>>>>> 
>>>>> 
>>>>> 
>>>>> srcTarget);
>>>>>>> - outputTargetsToMaterialize.put(impl, c);
>>>>>>> - return c;
>>>>>>> +
>>>>>>> + if (srcTarget == null) {
>>>>>>> + SourceTarget<T> st =
>>>>>> 
>>>>> 
>>>>> 
>>>>> createIntermediateOutput(pcollection.getPType());
>>>>>>> + if (!(st instanceof ReadableSourceTarget)) {
>>>>>>> + throw new IllegalArgumentException("The PType for the given
>>>>>>> PCollection is not readable"
>>>>>>> + + " and cannot be materialized");
>>>>>>> + } else {
>>>>>>> + srcTarget = (ReadableSourceTarget<T>) st;
>>>>>>> + addOutput(impl, srcTarget);
>>>>>>> + }
>>>>>>> + }
>>>>>>> +
>>>>>>> + return srcTarget;
>>>>>>> + }
>>>>>>> +
>>>>>>> + /**
>>>>>>> + * Safely cast a PCollection into a PCollectionImpl, including
>>>>>>> handling the case of UnionCollections.
>>>>>>> + * @param pcollection The PCollection to be cast/transformed
>>>>>>> + * @return The PCollectionImpl representation
>>>>>>> + */
>>>>>>> + private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T>
>>>>>>> pcollection) {
>>>>>>> + PCollectionImpl<T> pcollectionImpl = null;
>>>>>>> + if (pcollection instanceof UnionCollection) {
>>>>>>> + pcollectionImpl = (PCollectionImpl<T>)
>>>>>>> pcollection.parallelDo("UnionCollectionWrapper",
>>>>>>> + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
>>>>>>> + } else {
>>>>>>> + pcollectionImpl = (PCollectionImpl<T>) pcollection;
>>>>>>> + }
>>>>>>> + return pcollectionImpl;
>>>>>>> }
>>>>>>> 
>>>>>>> public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) {
>>>>>>> - return ptype.getDefaultFileSource(createTempPath());
>>>>>>> + return ptype.getDefaultFileSource(createTempPath());
>>>>>>> }
>>>>>>> 
>>>>>>> public Path createTempPath() {
>>>>>>> tempFileIndex++;
>>>>>>> return new Path(tempDirectory, "p" + tempFileIndex);
>>>>>>> }
>>>>>>> -
>>>>>>> +
>>>>>>> private static Path createTempDirectory(Configuration conf) {
>>>>>>> Path dir = new Path("/tmp/crunch" + RANDOM.nextInt());
>>>>>>> - try {
>>>>>>> - FileSystem.get(conf).mkdirs(dir);
>>>>>>> - } catch (IOException e) {
>>>>>>> - LOG.error("Exception creating job output directory", e);
>>>>>>> - throw new RuntimeException(e);
>>>>>>> - }
>>>>>>> + try {
>>>>>>> + FileSystem.get(conf).mkdirs(dir);
>>>>>>> + } catch (IOException e) {
>>>>>>> + LOG.error("Exception creating job output directory", e);
>>>>>>> + throw new RuntimeException(e);
>>>>>>> + }
>>>>>>> return dir;
>>>>>>> }
>>>>>>> -
>>>>>>> +
>>>>>>> @Override
>>>>>>> public <T> void writeTextFile(PCollection<T> pcollection, String
>>>>>> 
>>>>> 
>>>>> 
>>>>> pathName) {
>>>>>>> // Ensure that this is a writable pcollection instance.
>>>>>>> - pcollection = pcollection.parallelDo("asText",
>>>>>> 
>>>>> 
>>>>> 
>>>>> IdentityFn.<T>getInstance(),
>>>>>>> - WritableTypeFamily.getInstance().as(pcollection.getPType()));
>>>>>>> + pcollection = pcollection.parallelDo("asText", IdentityFn.<T>
>>>>>>> getInstance(), WritableTypeFamily
>>>>>>> + .getInstance().as(pcollection.getPType()));
>>>>>>> write(pcollection, At.textFile(pathName));
>>>>>>> }
>>>>>>> 
>>>>>>> @@ -256,7 +291,7 @@ public class MRPipeline implements Pipeline {
>>>>>>> LOG.info (http://LOG.info)("Exception during cleanup", e);
>>>>>>> }
>>>>>>> }
>>>>>>> -
>>>>>>> +
>>>>>>> public int getNextAnonymousStageId() {
>>>>>>> return nextAnonymousStageId++;
>>>>>>> }
>>>>>>> @@ -265,7 +300,7 @@ public class MRPipeline implements Pipeline {
>>>>>>> public void enableDebug() {
>>>>>>> // Turn on Crunch runtime error catching.
>>>>>>> getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
>>>>>>> -
>>>>>>> +
>>>>>>> // Write Hadoop's WARN logs to the console.
>>>>>>> Logger crunchInfoLogger = LogManager.getLogger("com.cloudera.crunch");
>>>>>>> Appender console = crunchInfoLogger.getAppender("A");
>>>>>>> @@ -277,9 +312,9 @@ public class MRPipeline implements Pipeline {
>>>>>>> LOG.warn("Could not find console appender named 'A' for writing
>>>>>>> Hadoop warning logs");
>>>>>>> }
>>>>>>> }
>>>>>>> -
>>>>>>> +
>>>>>>> @Override
>>>>>>> public String getName() {
>>>>>>> - return name;
>>>>>>> + return name;
>>>>>>> }
>>>>>>> }
>>>>>>> diff --git
>>>>>> 
>>>>> 
>>>>> 
>>>>> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
>>>>>> 
>>>>> 
>>>>> 
>>>>> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
>>>>>>> index d41a52e..68ef054 100644
>>>>>>> ---
>>>>>> 
>>>>> 
>>>>> 
>>>>> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
>>>>>>> +++
>>>>>> 
>>>>> 
>>>>> 
>>>>> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
>>>>>>> @@ -12,6 +12,10 @@ public class CrunchRuntimeException extends
>>>>>>> RuntimeException {
>>>>>>> super(e);
>>>>>>> }
>>>>>>> 
>>>>>>> + public CrunchRuntimeException(String msg, Exception e) {
>>>>>>> + super(msg, e);
>>>>>>> + }
>>>>>>> +
>>>>>>> public boolean wasLogged() {
>>>>>>> return logged;
>>>>>>> }
>>>>>>> diff --git
>>>>>> 
>>>>> 
>>>>> 
>>>>> src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
>>>>>>> src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
>>>>>>> index 1122d62..4debfeb 100644
>>>>>>> --- src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
>>>>>>> +++ src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
>>>>>>> @@ -45,7 +45,8 @@ public class AvroFileSource<T> extends
>>>>>>> FileSourceImpl<T> implements ReadableSour
>>>>>>> 
>>>>>>> @Override
>>>>>>> public Iterable<T> read(Configuration conf) throws IOException {
>>>>>>> - return CompositePathIterable.create(FileSystem.get(conf), path,
>>>>>>> new AvroFileReaderFactory<T>(
>>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf);
>>>>>>> + return CompositePathIterable.create(fs, path, new
>>>>>> 
>>>>> 
>>>>> 
>>>>> AvroFileReaderFactory<T>(
>>>>>>> (AvroType<T>) ptype, conf));
>>>>>>> }
>>>>>>> }
>>>>>>> diff --git src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
>>>>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
>>>>>>> index 24dec2d..462ef93 100644
>>>>>>> --- src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
>>>>>>> +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
>>>>>>> @@ -26,18 +26,16 @@ import com.cloudera.crunch.io.ReadableSource;
>>>>>>> import com.cloudera.crunch.io.impl.FileSourceImpl;
>>>>>>> import com.cloudera.crunch.types.PType;
>>>>>>> 
>>>>>>> -public class SeqFileSource<T> extends FileSourceImpl<T> implements
>>>>>>> - ReadableSource<T> {
>>>>>>> +public class SeqFileSource<T> extends FileSourceImpl<T> implements
>>>>>>> ReadableSource<T> {
>>>>>>> 
>>>>>>> public SeqFileSource(Path path, PType<T> ptype) {
>>>>>>> - super(path, ptype, SequenceFileInputFormat.class);
>>>>>>> + super(path, ptype, SequenceFileInputFormat.class);
>>>>>>> }
>>>>>>> -
>>>>>>> +
>>>>>>> @Override
>>>>>>> public Iterable<T> read(Configuration conf) throws IOException {
>>>>>>> - FileSystem fs = FileSystem.get(conf);
>>>>>>> - return CompositePathIterable.create(fs, path,
>>>>>>> - new SeqFileReaderFactory<T>(ptype, conf));
>>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf);
>>>>>>> + return CompositePathIterable.create(fs, path, new
>>>>>>> SeqFileReaderFactory<T>(ptype, conf));
>>>>>>> }
>>>>>>> 
>>>>>>> @Override
>>>>>>> diff --git
>>>>>> 
>>>>> 
>>>>> 
>>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
>>>>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
>>>>>>> index 69ca12b..4db6658 100644
>>>>>>> --- src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
>>>>>>> +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
>>>>>>> @@ -42,7 +42,7 @@ public class SeqFileTableSource<K, V> extends
>>>>>>> FileTableSourceImpl<K, V> implemen
>>>>>>> 
>>>>>>> @Override
>>>>>>> public Iterable<Pair<K, V>> read(Configuration conf) throws
>>>>>> 
>>>>> 
>>>>> 
>>>>> IOException {
>>>>>>> - FileSystem fs = FileSystem.get(conf);
>>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf);
>>>>>>> return CompositePathIterable.create(fs, path,
>>>>>>> new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf));
>>>>>>> }
>>>>>>> diff --git
>>>>>> 
>>>>> 
>>>>> 
>>>>> src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
>>>>>>> src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
>>>>>>> index a876843..e0dbe68 100644
>>>>>>> --- src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
>>>>>>> +++ src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
>>>>>>> @@ -67,7 +67,7 @@ public class TextFileSource<T> extends
>>>>>>> FileSourceImpl<T> implements
>>>>>>> 
>>>>>>> @Override
>>>>>>> public Iterable<T> read(Configuration conf) throws IOException {
>>>>>>> - return CompositePathIterable.create(FileSystem.get(conf), path,
>>>>>>> - new TextFileReaderFactory<T>(ptype, conf));
>>>>>>> + return CompositePathIterable.create(FileSystem.get(path.toUri(),
>>>>>>> conf), path,
>>>>>>> + new TextFileReaderFactory<T>(ptype, conf));
>>>>>>> }
>>>>>>> }
>>>>>>> diff --git src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
>>>>>>> src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
>>>>>>> new file mode 100644
>>>>>>> index 0000000..8072e07
>>>>>>> --- /dev/null
>>>>>>> +++ src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
>>>>>>> @@ -0,0 +1,143 @@
>>>>>>> +package com.cloudera.crunch.lib.join;
>>>>>>> +
>>>>>>> +import java.io.IOException;
>>>>>>> +
>>>>>>> +import org.apache.hadoop.filecache.DistributedCache;
>>>>>>> +import org.apache.hadoop.fs.FileSystem;
>>>>>>> +import org.apache.hadoop.fs.Path;
>>>>>>> +
>>>>>>> +import com.cloudera.crunch.DoFn;
>>>>>>> +import com.cloudera.crunch.Emitter;
>>>>>>> +import com.cloudera.crunch.PTable;
>>>>>>> +import com.cloudera.crunch.Pair;
>>>>>>> +import com.cloudera.crunch.impl.mr.MRPipeline;
>>>>>>> +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
>>>>>>> +import com.cloudera.crunch.io.ReadableSourceTarget;
>>>>>>> +import com.cloudera.crunch.io.impl.SourcePathTargetImpl;
>>>>>>> +import com.cloudera.crunch.types.PType;
>>>>>>> +import com.cloudera.crunch.types.PTypeFamily;
>>>>>>> +import com.google.common.collect.ArrayListMultimap;
>>>>>>> +import com.google.common.collect.Multimap;
>>>>>>> +
>>>>>>> +/**
>>>>>>> + * Utility for doing map side joins on a common key between two
>>>>>>> {@link PTable}s.
>>>>>>> + * <p>
>>>>>>> + * A map side join is an optimized join which doesn't use a reducer;
>>>>>> 
>>>>> 
>>>>> 
>>>>> instead,
>>>>>>> + * the right side of the join is loaded into memory and the join is
>>>>>>> performed in
>>>>>>> + * a mapper. This style of join has the important implication that
>>>>>>> the output of
>>>>>>> + * the join is not sorted, which is the case with a conventional
>>>>>>> (reducer-based)
>>>>>>> + * join.
>>>>>>> + * <p>
>>>>>>> + * <b>Note:</b>This utility is only supported when running with a
>>>>>>> + * {@link MRPipeline} as the pipeline.
>>>>>>> + */
>>>>>>> +public class MapsideJoin {
>>>>>>> +
>>>>>>> + /**
>>>>>>> + * Join two tables using a map side join. The right-side table will
>>>>>> 
>>>>> 
>>>>> 
>>>>> be loaded
>>>>>>> + * fully in memory, so this method should only be used if the right
>>>>>> 
>>>>> 
>>>>> 
>>>>> side
>>>>>>> + * table's contents can fit in the memory allocated to mappers. The
>>>>>> 
>>>>> 
>>>>> 
>>>>> join
>>>>>>> + * performed by this method is an inner join.
>>>>>>> + *
>>>>>>> + * @param left
>>>>>>> + * The left-side table of the join
>>>>>>> + * @param right
>>>>>>> + * The right-side table of the join, whose contents will be fully
>>>>>>> + * read into memory
>>>>>>> + * @return A table keyed on the join key, containing pairs of joined
>>>>>> 
>>>>> 
>>>>> 
>>>>> values
>>>>>>> + */
>>>>>>> + public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U>
>>>>>>> left, PTable<K, V> right) {
>>>>>>> +
>>>>>>> + if (!(right.getPipeline() instanceof MRPipeline)) {
>>>>>>> + throw new CrunchRuntimeException("Map-side join is only
>>>>>>> supported within a MapReduce context");
>>>>>>> + }
>>>>>>> +
>>>>>>> + MRPipeline pipeline = (MRPipeline) right.getPipeline();
>>>>>>> + pipeline.materialize(right);
>>>>>>> +
>>>>>>> + // TODO Move necessary logic to MRPipeline so that we can
>>>>>> 
>>>>> 
>>>>> 
>>>>> theoretically
>>>>>>> + // optimize his by running the setup of multiple map-side joins
>>>>>>> concurrently
>>>>>>> + pipeline.run();
>>>>>>> +
>>>>>>> + ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline
>>>>>>> + .getMaterializeSourceTarget(right);
>>>>>>> + if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
>>>>>>> + throw new CrunchRuntimeException("Right-side contents can't be
>>>>>>> read from a path");
>>>>>>> + }
>>>>>>> +
>>>>>>> + // Suppress warnings because we've just checked this cast via
>>>>>> 
>>>>> 
>>>>> 
>>>>> instanceof
>>>>>>> + @SuppressWarnings("unchecked")
>>>>>>> + SourcePathTargetImpl<Pair<K, V>> sourcePathTarget =
>>>>>>> (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget;
>>>>>>> +
>>>>>>> + Path path = sourcePathTarget.getPath();
>>>>>>> + DistributedCache.addCacheFile(path.toUri(),
>>>>>> 
>>>>> 
>>>>> 
>>>>> pipeline.getConfiguration());
>>>>>>> +
>>>>>>> + MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U,
>>>>>>> V>(path.toString(),
>>>>>>> + right.getPType());
>>>>>>> + PTypeFamily typeFamily = left.getTypeFamily();
>>>>>>> + return left.parallelDo(
>>>>>>> + "mapjoin",
>>>>>>> + mapJoinDoFn,
>>>>>>> + typeFamily.tableOf(left.getKeyType(),
>>>>>>> + typeFamily.pairs(left.getValueType(), right.getValueType())));
>>>>>>> +
>>>>>>> + }
>>>>>>> +
>>>>>>> + static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>,
>>>>>>> Pair<K, Pair<U, V>>> {
>>>>>>> +
>>>>>>> + private String inputPath;
>>>>>>> + private PType<Pair<K, V>> ptype;
>>>>>>> + private Multimap<K, V> joinMap;
>>>>>>> +
>>>>>>> + public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
>>>>>>> + this.inputPath = inputPath;
>>>>>>> + this.ptype = ptype;
>>>>>>> + }
>>>>>>> +
>>>>>>> + private Path getCacheFilePath() {
>>>>>>> + try {
>>>>>>> + for (Path localPath :
>>>>>>> DistributedCache.getLocalCacheFiles(getConfiguration())) {
>>>>>>> + if (localPath.toString().endsWith(inputPath)) {
>>>>>>> + return
>>>>>>> localPath.makeQualified(FileSystem.getLocal(getConfiguration()));
>>>>>>> +
>>>>>>> + }
>>>>>>> + }
>>>>>>> + } catch (IOException e) {
>>>>>>> + throw new CrunchRuntimeException(e);
>>>>>>> + }
>>>>>>> +
>>>>>>> + throw new CrunchRuntimeException("Can't find local cache file
>>>>>>> for '" + inputPath + "'");
>>>>>>> + }
>>>>>>> +
>>>>>>> + @Override
>>>>>>> + public void initialize() {
>>>>>>> + super.initialize();
>>>>>>> +
>>>>>>> + ReadableSourceTarget<Pair<K, V>> sourceTarget =
>>>>>>> (ReadableSourceTarget<Pair<K, V>>) ptype
>>>>>>> + .getDefaultFileSource(getCacheFilePath());
>>>>>>> + Iterable<Pair<K, V>> iterable = null;
>>>>>>> + try {
>>>>>>> + iterable = sourceTarget.read(getConfiguration());
>>>>>>> + } catch (IOException e) {
>>>>>>> + throw new CrunchRuntimeException("Error reading right-side of
>>>>>>> map side join: ", e);
>>>>>>> + }
>>>>>>> +
>>>>>>> + joinMap = ArrayListMultimap.create();
>>>>>>> + for (Pair<K, V> joinPair : iterable) {
>>>>>>> + joinMap.put(joinPair.first(), joinPair.second());
>>>>>>> + }
>>>>>>> + }
>>>>>>> +
>>>>>>> + @Override
>>>>>>> + public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U,
>>>>>>> V>>> emitter) {
>>>>>>> + K key = input.first();
>>>>>>> + U value = input.second();
>>>>>>> + for (V joinValue : joinMap.get(key)) {
>>>>>>> + Pair<U, V> valuePair = Pair.of(value, joinValue);
>>>>>>> + emitter.emit(Pair.of(key, valuePair));
>>>>>>> + }
>>>>>>> + }
>>>>>>> +
>>>>>>> + }
>>>>>>> +
>>>>>>> +}
>>>>>>> diff --git src/main/java/com/cloudera/crunch/types/PType.java
>>>>>>> src/main/java/com/cloudera/crunch/types/PType.java
>>>>>>> index af4ef1b..ae480aa 100644
>>>>>>> --- src/main/java/com/cloudera/crunch/types/PType.java
>>>>>>> +++ src/main/java/com/cloudera/crunch/types/PType.java
>>>>>>> @@ -15,6 +15,7 @@
>>>>>>> 
>>>>>>> package com.cloudera.crunch.types;
>>>>>>> 
>>>>>>> +import java.io.Serializable;
>>>>>>> import java.util.List;
>>>>>>> 
>>>>>>> import org.apache.hadoop.fs.Path;
>>>>>>> @@ -31,7 +32,7 @@ import com.cloudera.crunch.SourceTarget;
>>>>>>> * {@code PCollection}.
>>>>>>> *
>>>>>>> */
>>>>>>> -public interface PType<T> {
>>>>>>> +public interface PType<T> extends Serializable {
>>>>>>> /**
>>>>>>> * Returns the Java type represented by this {@code PType}.
>>>>>>> */
>>>>>>> diff --git src/main/java/com/cloudera/crunch/types/avro/AvroType.java
>>>>>>> src/main/java/com/cloudera/crunch/types/avro/AvroType.java
>>>>>>> index 3db00c0..29af9fb 100644
>>>>>>> --- src/main/java/com/cloudera/crunch/types/avro/AvroType.java
>>>>>>> +++ src/main/java/com/cloudera/crunch/types/avro/AvroType.java
>>>>>>> @@ -14,6 +14,7 @@
>>>>>>> */
>>>>>>> package com.cloudera.crunch.types.avro;
>>>>>>> 
>>>>>>> +import java.io.Serializable;
>>>>>>> import java.util.List;
>>>>>>> 
>>>>>>> import org.apache.avro.Schema;
>>>>>>> @@ -41,7 +42,8 @@ public class AvroType<T> implements PType<T> {
>>>>>>> private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
>>>>>>> 
>>>>>>> private final Class<T> typeClass;
>>>>>>> - private final Schema schema;
>>>>>>> + private final String schemaString;
>>>>>>> + private transient Schema schema;
>>>>>>> private final MapFn baseInputMapFn;
>>>>>>> private final MapFn baseOutputMapFn;
>>>>>>> private final List<PType> subTypes;
>>>>>>> @@ -55,6 +57,7 @@ public class AvroType<T> implements PType<T> {
>>>>>>> MapFn outputMapFn, PType... ptypes) {
>>>>>>> this.typeClass = typeClass;
>>>>>>> this.schema = Preconditions.checkNotNull(schema);
>>>>>>> + this.schemaString = schema.toString();
>>>>>>> this.baseInputMapFn = inputMapFn;
>>>>>>> this.baseOutputMapFn = outputMapFn;
>>>>>>> this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
>>>>>>> @@ -76,6 +79,9 @@ public class AvroType<T> implements PType<T> {
>>>>>>> }
>>>>>>> 
>>>>>>> public Schema getSchema() {
>>>>>>> + if (schema == null){
>>>>>>> + schema = new Schema.Parser().parse(schemaString);
>>>>>>> + }
>>>>>>> return schema;
>>>>>>> }
>>>>>>> 
>>>>>>> diff --git
>>>>> src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
>>>>>>> src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
>>>>>>> new file mode 100644
>>>>>>> index 0000000..f265460
>>>>>>> --- /dev/null
>>>>>>> +++ src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
>>>>>>> @@ -0,0 +1,60 @@
>>>>>>> +package com.cloudera.crunch.impl.mr;
>>>>>>> +
>>>>>>> +import static org.junit.Assert.assertEquals;
>>>>>>> +import static org.mockito.Mockito.doReturn;
>>>>>>> +import static org.mockito.Mockito.mock;
>>>>>>> +import static org.mockito.Mockito.spy;
>>>>>>> +import static org.mockito.Mockito.when;
>>>>>>> +
>>>>>>> +import java.io.IOException;
>>>>>>> +
>>>>>>> +import org.junit.Before;
>>>>>>> +import org.junit.Test;
>>>>>>> +
>>>>>>> +import com.cloudera.crunch.SourceTarget;
>>>>>>> +import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
>>>>>>> +import com.cloudera.crunch.io.ReadableSourceTarget;
>>>>>>> +import com.cloudera.crunch.types.avro.Avros;
>>>>>>> +
>>>>>>> +public class MRPipelineTest {
>>>>>>> +
>>>>>>> + private MRPipeline pipeline;
>>>>>>> +
>>>>>>> + @Before
>>>>>>> + public void setUp() throws IOException {
>>>>>>> + pipeline = spy(new MRPipeline(MRPipelineTest.class));
>>>>>>> + }
>>>>>>> +
>>>>>>> + @Test
>>>>>>> + public void testGetMaterializeSourceTarget_AlreadyMaterialized() {
>>>>>>> + PCollectionImpl<String> materializedPcollection =
>>>>>>> mock(PCollectionImpl.class);
>>>>>>> + ReadableSourceTarget<String> readableSourceTarget =
>>>>>>> mock(ReadableSourceTarget.class);
>>>>>>> +
>>>>>> 
>>>>> 
>>>>> 
>>>>> when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget);
>>>>>>> +
>>>>>>> + assertEquals(readableSourceTarget,
>>>>>>> pipeline.getMaterializeSourceTarget(materializedPcollection));
>>>>>>> + }
>>>>>>> +
>>>>>>> + @Test
>>>>>>> + public void
>>>>>> 
>>>>> 
>>>>> 
>>>>> testGetMaterializeSourceTarget_NotMaterialized_HasOutput() {
>>>>>>> +
>>>>>>> + PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
>>>>>>> + ReadableSourceTarget<String> readableSourceTarget =
>>>>>>> mock(ReadableSourceTarget.class);
>>>>>>> + when(pcollection.getPType()).thenReturn(Avros.strings());
>>>>>>> +
>>>>>> 
>>>>> 
>>>>> 
>>>>> doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
>>>>>>> + when(pcollection.getMaterializedAt()).thenReturn(null);
>>>>>>> +
>>>>>>> + assertEquals(readableSourceTarget,
>>>>>>> pipeline.getMaterializeSourceTarget(pcollection));
>>>>>>> + }
>>>>>>> +
>>>>>>> + @Test(expected = IllegalArgumentException.class)
>>>>>>> + public void
>>>>>> 
>>>>> 
>>>>> 
>>>>> testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget()
>>>>>>> {
>>>>>>> + PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
>>>>>>> + SourceTarget<String> nonReadableSourceTarget =
>>>>>> 
>>>>> 
>>>>> 
>>>>> mock(SourceTarget.class);
>>>>>>> + when(pcollection.getPType()).thenReturn(Avros.strings());
>>>>>>> +
>>>>>> 
>>>>> 
>>>>> 
>>>>> doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
>>>>>>> + when(pcollection.getMaterializedAt()).thenReturn(null);
>>>>>>> +
>>>>>>> + pipeline.getMaterializeSourceTarget(pcollection);
>>>>>>> + }
>>>>>>> +
>>>>>>> +}
>>>>>>> diff --git
>>>>>> 
>>>>> 
>>>>> 
>>>>> src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
>>>>>>> src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
>>>>>>> new file mode 100644
>>>>>>> index 0000000..97e0c63
>>>>>>> --- /dev/null
>>>>>>> +++ src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
>>>>>>> @@ -0,0 +1,102 @@
>>>>>>> +package com.cloudera.crunch.lib.join;
>>>>>>> +
>>>>>>> +import static org.junit.Assert.assertEquals;
>>>>>>> +import static org.junit.Assert.assertTrue;
>>>>>>> +
>>>>>>> +import java.io.IOException;
>>>>>>> +import java.util.Collections;
>>>>>>> +import java.util.List;
>>>>>>> +
>>>>>>> +import org.junit.Test;
>>>>>>> +
>>>>>>> +import com.cloudera.crunch.FilterFn;
>>>>>>> +import com.cloudera.crunch.MapFn;
>>>>>>> +import com.cloudera.crunch.PTable;
>>>>>>> +import com.cloudera.crunch.Pair;
>>>>>>> +import com.cloudera.crunch.Pipeline;
>>>>>>> +import com.cloudera.crunch.impl.mem.MemPipeline;
>>>>>>> +import com.cloudera.crunch.impl.mr.MRPipeline;
>>>>>>> +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
>>>>>>> +import com.cloudera.crunch.test.FileHelper;
>>>>>>> +import com.cloudera.crunch.types.writable.Writables;
>>>>>>> +import com.google.common.collect.Lists;
>>>>>>> +
>>>>>>> +public class MapsideJoinTest {
>>>>>>> +
>>>>>>> + private static class LineSplitter extends MapFn<String,
>>>>>>> Pair<Integer, String>> {
>>>>>>> +
>>>>>>> + @Override
>>>>>>> + public Pair<Integer, String> map(String input) {
>>>>>>> + String[] fields = input.split("\\|");
>>>>>>> + return Pair.of(Integer.parseInt(fields[0]), fields[1]);
>>>>>>> + }
>>>>>>> +
>>>>>>> + }
>>>>>>> +
>>>>>>> + private static class NegativeFilter extends FilterFn<Pair<Integer,
>>>>>> 
>>>>> 
>>>>> 
>>>>> String>> {
>>>>>>> +
>>>>>>> + @Override
>>>>>>> + public boolean accept(Pair<Integer, String> input) {
>>>>>>> + return false;
>>>>>>> + }
>>>>>>> +
>>>>>>> + }
>>>>>>> +
>>>>>>> + @Test(expected = CrunchRuntimeException.class)
>>>>>>> + public void testNonMapReducePipeline() {
>>>>>>> + runMapsideJoin(MemPipeline.getInstance());
>>>>>>> + }
>>>>>>> +
>>>>>>> + @Test
>>>>>>> + public void testMapsideJoin_RightSideIsEmpty() throws IOException {
>>>>>>> + MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class);
>>>>>>> + PTable<Integer, String> customerTable = readTable(pipeline,
>>>>>>> "customers.txt");
>>>>>>> + PTable<Integer, String> orderTable = readTable(pipeline,
>>>>>> 
>>>>> 
>>>>> 
>>>>> "orders.txt");
>>>>>>> +
>>>>>>> + PTable<Integer, String> filteredOrderTable =
>>>>>>> orderTable.parallelDo(new NegativeFilter(),
>>>>>>> + orderTable.getPTableType());
>>>>>>> +
>>>>>>> + PTable<Integer, Pair<String, String>> joined =
>>>>>>> MapsideJoin.join(customerTable,
>>>>>>> + filteredOrderTable);
>>>>>>> +
>>>>>>> + List<Pair<Integer, Pair<String, String>>> materializedJoin =
>>>>>>> Lists.newArrayList(joined
>>>>>>> + .materialize());
>>>>>>> +
>>>>>>> + assertTrue(materializedJoin.isEmpty());
>>>>>>> +
>>>>>>> + }
>>>>>>> +
>>>>>>> + @Test
>>>>>>> + public void testMapsideJoin() throws IOException {
>>>>>>> + runMapsideJoin(new MRPipeline(MapsideJoinTest.class));
>>>>>>> + }
>>>>>>> +
>>>>>>> + private void runMapsideJoin(Pipeline pipeline) {
>>>>>>> + PTable<Integer, String> customerTable = readTable(pipeline,
>>>>>>> "customers.txt");
>>>>>>> + PTable<Integer, String> orderTable = readTable(pipeline,
>>>>>> 
>>>>> 
>>>>> 
>>>>> "orders.txt");
>>>>>>> +
>>>>>>> + PTable<Integer, Pair<String, String>> joined =
>>>>>>> MapsideJoin.join(customerTable, orderTable);
>>>>>>> +
>>>>>>> + List<Pair<Integer, Pair<String, String>>> expectedJoinResult =
>>>>>>> Lists.newArrayList();
>>>>>>> + expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn
>>>>>> 
>>>>> 
>>>>> 
>>>>> flakes")));
>>>>>>> + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet
>>>>>> 
>>>>> 
>>>>> 
>>>>> paper")));
>>>>>>> + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet
>>>>>>> plunger")));
>>>>>>> + expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else",
>>>>>>> "Toilet brush")));
>>>>>>> +
>>>>>>> + List<Pair<Integer, Pair<String, String>>> joinedResultList =
>>>>>>> Lists.newArrayList(joined
>>>>>>> + .materialize());
>>>>>>> + Collections.sort(joinedResultList);
>>>>>>> +
>>>>>>> + assertEquals(expectedJoinResult, joinedResultList);
>>>>>>> + }
>>>>>>> +
>>>>>>> + private static PTable<Integer, String> readTable(Pipeline pipeline,
>>>>>>> String filename) {
>>>>>>> + try {
>>>>>>> + return
>>>>>> 
>>>>> 
>>>>> 
>>>>> pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable",
>>>>>>> + new LineSplitter(), Writables.tableOf(Writables.ints(),
>>>>>>> Writables.strings()));
>>>>>>> + } catch (IOException e) {
>>>>>>> + throw new RuntimeException(e);
>>>>>>> + }
>>>>>>> + }
>>>>>>> +
>>>>>>> +}
>>>>>>> diff --git src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
>>>>>>> src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
>>>>>>> index 74e2ad3..c6a0b46 100644
>>>>>>> --- src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
>>>>>>> +++ src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
>>>>>>> @@ -25,6 +25,7 @@ import org.apache.avro.Schema.Type;
>>>>>>> import org.apache.avro.generic.GenericData;
>>>>>>> import org.apache.avro.util.Utf8;
>>>>>>> import org.apache.hadoop.io.LongWritable;
>>>>>>> +import org.junit.Ignore;
>>>>>>> import org.junit.Test;
>>>>>>> 
>>>>>>> import com.cloudera.crunch.Pair;
>>>>>>> @@ -103,6 +104,7 @@ public class AvrosTest {
>>>>>>> }
>>>>>>> 
>>>>>>> @Test
>>>>>>> + @Ignore("This test creates an invalid schema that causes
>>>>>>> Schema#toString to fail")
>>>>>>> public void testNestedTables() throws Exception {
>>>>>>> PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(),
>>>>>> 
>>>>> 
>>>>> 
>>>>> Avros.longs());
>>>>>>> PTableType<Pair<Long, Long>, String> nest = Avros.tableOf(pll,
>>>>>>> Avros.strings());
>>>>>>> diff --git src/test/resources/customers.txt
>>>>>> 
>>>>> 
>>>>> 
>>>>> src/test/resources/customers.txt
>>>>>>> new file mode 100644
>>>>>>> index 0000000..98f3f3d
>>>>>>> --- /dev/null
>>>>>>> +++ src/test/resources/customers.txt
>>>>>>> @@ -0,0 +1,4 @@
>>>>>>> +111|John Doe
>>>>>>> +222|Jane Doe
>>>>>>> +333|Someone Else
>>>>>>> +444|Has No Orders
>>>>>>> \ No newline at end of file
>>>>>>> diff --git src/test/resources/orders.txt src/test/resources/orders.txt
>>>>>>> new file mode 100644
>>>>>>> index 0000000..2f1383f
>>>>>>> --- /dev/null
>>>>>>> +++ src/test/resources/orders.txt
>>>>>>> @@ -0,0 +1,4 @@
>>>>>>> +222|Toilet plunger
>>>>>>> +333|Toilet brush
>>>>>>> +222|Toilet paper
>>>>>>> +111|Corn flakes
>>>>>>> \ No newline at end of file
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Jul 3, 2012 at 10:42 AM, Christian Tzolov
>>>>>>> <[email protected] (mailto:[email protected])>
>>>>>> 
>>>>> 
>>>>> 
>>>>> wrote:
>>>>>>>> Hi Gabriel,
>>>>>>>> 
>>>>>>>> Seems like the attachment is missing.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Chris
>>>>>>>> 
>>>>>>>> On Tue, Jul 3, 2012 at 9:23 AM, Gabriel Reid <[email protected] 
>>>>>>>> (mailto:[email protected])(mailto:
>>>>> [email protected] (mailto:[email protected]))> wrote:
>>>>>>>> 
>>>>>>>>> Hi guys,
>>>>>>>>> 
>>>>>>>>> Attached (hopefully) is a patch for an initial implementation of
>>>>> map
>>>>>>>>> side joins. It's currently implemented as a static method in a
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> class
>>>>>>>>> called MapsideJoin, with the same interface as the existing Join
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> class
>>>>>>>>> (with only inner joins being implemented for now). The way it
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> works is
>>>>>>>>> that the right-side PTable of the join is put in the distributed
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> cache
>>>>>>>>> and then read by the join function at runtime.
>>>>>>>>> 
>>>>>>>>> There's one spot that I can see for a potentially interesting
>>>>>>>>> optimization -- MRPipeline#run is called once for each map side
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> join
>>>>>>>>> that is set up, but if the setup of the joins was done within
>>>>>>>>> MRPipeline, then we could set up multiple map side joins in
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> parallel
>>>>>>>>> with a single call to MRPipeline#run. OTOH, a whole bunch of map
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> side
>>>>>>>>> joins in parallel probably isn't that common of an operation.
>>>>>>>>> 
>>>>>>>>> If anyone feels like taking a look at the patch, any feedback is
>>>>>>>>> appreciated. If nobody sees something that needs serious changes in
>>>>>>>>> the patch, I'll commit it.
>>>>>>>>> 
>>>>>>>>> - Gabriel
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Thu, Jun 21, 2012 at 9:09 AM, Gabriel Reid <
>>>>> [email protected] (mailto:[email protected])>
>>>>>>>>> wrote:
>>>>>>>>>> Replying to all...
>>>>>>>>>> 
>>>>>>>>>> On Thu, Jun 21, 2012 at 8:40 AM, Josh Wills <[email protected] 
>>>>>>>>>> (mailto:[email protected])(mailto:
>>>>> [email protected] (mailto:[email protected]))> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> So there's a philosophical issue here: should Crunch ever make
>>>>>>>>>>> decisions about how to do something itself based on its
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> estimates of
>>>>>>>>>>> the size of the data sets, or should it always do exactly what
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> the
>>>>>>>>>>> developer indicates?
>>>>>>>>>>> 
>>>>>>>>>>> I can make a case either way, but I think that no matter what,
>>>>> we
>>>>>>>>>>> would want to have explicit functions for performing a join
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> that reads
>>>>>>>>>>> one data set into memory, so I think we can proceed w/the
>>>>>>>>>>> implementation while folks weigh in on what their preferences
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> are for
>>>>>>>>>>> the default join() behavior (e.g., just do a reduce-side join,
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> or try
>>>>>>>>>>> to figure out the best join given information about the input
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> data and
>>>>>>>>>>> some configuration parameters.)
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> I definitely agree on needing to have an explicit way to invoke
>>>>> one or
>>>>>>>>>> the other -- and in general I don't like having magic behind the
>>>>>>>>>> scenes to decide on behaviour (especially considering Crunch is
>>>>>>>>>> generally intended to be closer to the metal than Pig and Hive).
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> I'm
>>>>>>>>>> not sure if the runtime decision is something specific to some
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> of my
>>>>>>>>>> use cases or if it could be useful to a wider audience.
>>>>>>>>>> 
>>>>>>>>>> The ability to dynamically decide at runtime whether a map side
>>>>> join
>>>>>>>>>> should be used can also easily be tacked on outside of Crunch,
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> and
>>>>>>>>>> won't impact the underlying implementation (as you pointed out),
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> so I
>>>>>>>>>> definitely also agree on focusing on the underlying
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> implementation
>>>>>>>>>> first, and we can worry about the options used for exposing it
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> later
>>>>>>>>>> on.
>>>>>>>>>> 
>>>>>>>>>> - Gabriel
>>>> 
>>>> 
>>>> --
>>>> Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>> 
>>> 
>>> 

Reply via email to