Extract setup for materialize Pull out the setup method for doing a materialize on a PCollection re-using it for in-memory mapside joins.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/6d701c13 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/6d701c13 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/6d701c13 Branch: refs/heads/master Commit: 6d701c13fde22a342508c11f0de351a689ea5752 Parents: 9367826 Author: Gabriel Reid <[email protected]> Authored: Sun Jun 24 20:31:02 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Fri Jul 6 17:54:29 2012 +0200 ---------------------------------------------------------------------- .../com/cloudera/crunch/impl/mr/MRPipeline.java | 163 +++++++++------ .../cloudera/crunch/impl/mr/MRPipelineTest.java | 60 ++++++ 2 files changed, 159 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/6d701c13/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java b/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java index 420e8dc..c8ba596 100644 --- a/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java +++ b/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java @@ -59,9 +59,9 @@ import com.google.common.collect.Sets; public class MRPipeline implements Pipeline { private static final Log LOG = LogFactory.getLog(MRPipeline.class); - + private static final Random RANDOM = new Random(); - + private final Class<?> jarClass; private final String name; private final Map<PCollectionImpl<?>, Set<Target>> outputTargets; @@ -75,15 +75,15 @@ public class MRPipeline implements Pipeline { public MRPipeline(Class<?> jarClass) throws IOException { this(jarClass, new Configuration()); } - - public MRPipeline(Class<?> jarClass, String name){ + + public MRPipeline(Class<?> jarClass, String name) { this(jarClass, name, new Configuration()); } - + public MRPipeline(Class<?> jarClass, Configuration conf) { - this(jarClass, jarClass.getName(), conf); + this(jarClass, jarClass.getName(), conf); } - + public MRPipeline(Class<?> jarClass, String name, Configuration conf) { this.jarClass = jarClass; this.name = name; @@ -102,9 +102,9 @@ public class MRPipeline implements Pipeline { @Override public void setConfiguration(Configuration conf) { - this.conf = conf; + this.conf = conf; } - + @Override public PipelineResult run() { MSCRPlanner planner = new MSCRPlanner(this, outputTargets); @@ -125,8 +125,8 @@ public class MRPipeline implements Pipeline { boolean materialized = false; for (Target t : outputTargets.get(c)) { if (!materialized && t instanceof Source) { - c.materializeAt((SourceTarget) t); - materialized = true; + c.materializeAt((SourceTarget) t); + materialized = true; } } } @@ -144,7 +144,7 @@ public class MRPipeline implements Pipeline { cleanup(); return res; } - + public <S> PCollection<S> read(Source<S> source) { return new InputCollection<S>(source, this); } @@ -160,85 +160,120 @@ public class MRPipeline implements Pipeline { @SuppressWarnings("unchecked") public void write(PCollection<?> pcollection, Target target) { if (pcollection instanceof PGroupedTableImpl) { - pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup(); + pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup(); } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) { - pcollection = pcollection.parallelDo("UnionCollectionWrapper", - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType()); + pcollection = pcollection.parallelDo("UnionCollectionWrapper", + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType()); } addOutput((PCollectionImpl<?>) pcollection, target); } private void addOutput(PCollectionImpl<?> impl, Target target) { if (!outputTargets.containsKey(impl)) { - outputTargets.put(impl, Sets.<Target>newHashSet()); + outputTargets.put(impl, Sets.<Target> newHashSet()); } outputTargets.get(impl).add(target); } - + @Override public <T> Iterable<T> materialize(PCollection<T> pcollection) { - - if (pcollection instanceof UnionCollection) { - pcollection = pcollection.parallelDo("UnionCollectionWrapper", - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType()); - } - PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection; + + PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection); + ReadableSourceTarget<T> srcTarget = getMaterializeSourceTarget(pcollectionImpl); + + MaterializableIterable<T> c = new MaterializableIterable<T>(this, srcTarget); + if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) { + outputTargetsToMaterialize.put(pcollectionImpl, c); + } + return c; + } + + /** + * Retrieve a ReadableSourceTarget that provides access to the contents of a + * {@link PCollection}. This is primarily intended as a helper method to + * {@link #materialize(PCollection)}. The underlying data of the + * ReadableSourceTarget may not be actually present until the pipeline is run. + * + * @param pcollection + * The collection for which the ReadableSourceTarget is to be + * retrieved + * @return The ReadableSourceTarget + * @throws IllegalArgumentException + * If no ReadableSourceTarget can be retrieved for the given + * PCollection + */ + public <T> ReadableSourceTarget<T> getMaterializeSourceTarget(PCollection<T> pcollection) { + PCollectionImpl<T> impl = toPcollectionImpl(pcollection); SourceTarget<T> matTarget = impl.getMaterializedAt(); if (matTarget != null && matTarget instanceof ReadableSourceTarget) { - return new MaterializableIterable<T>(this, (ReadableSourceTarget<T>) matTarget); + return (ReadableSourceTarget<T>) matTarget; + } + + ReadableSourceTarget<T> srcTarget = null; + if (outputTargets.containsKey(pcollection)) { + for (Target target : outputTargets.get(impl)) { + if (target instanceof ReadableSourceTarget) { + srcTarget = (ReadableSourceTarget<T>) target; + break; + } + } } - - ReadableSourceTarget<T> srcTarget = null; - if (outputTargets.containsKey(pcollection)) { - for (Target target : outputTargets.get(impl)) { - if (target instanceof ReadableSourceTarget) { - srcTarget = (ReadableSourceTarget) target; - break; - } - } - } - - if (srcTarget == null) { - SourceTarget<T> st = createIntermediateOutput(pcollection.getPType()); - if (!(st instanceof ReadableSourceTarget)) { - throw new IllegalArgumentException("The PType for the given PCollection is not readable" - + " and cannot be materialized"); - } else { - srcTarget = (ReadableSourceTarget) st; - addOutput(impl, srcTarget); - } - } - - MaterializableIterable<T> c = new MaterializableIterable<T>(this, srcTarget); - outputTargetsToMaterialize.put(impl, c); - return c; + + if (srcTarget == null) { + SourceTarget<T> st = createIntermediateOutput(pcollection.getPType()); + if (!(st instanceof ReadableSourceTarget)) { + throw new IllegalArgumentException("The PType for the given PCollection is not readable" + + " and cannot be materialized"); + } else { + srcTarget = (ReadableSourceTarget<T>) st; + addOutput(impl, srcTarget); + } + } + + return srcTarget; + } + + /** + * Safely cast a PCollection into a PCollectionImpl, including handling the case of UnionCollections. + * @param pcollection The PCollection to be cast/transformed + * @return The PCollectionImpl representation + */ + private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection) { + PCollectionImpl<T> pcollectionImpl = null; + if (pcollection instanceof UnionCollection) { + pcollectionImpl = (PCollectionImpl<T>) pcollection.parallelDo("UnionCollectionWrapper", + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType()); + } else { + pcollectionImpl = (PCollectionImpl<T>) pcollection; + } + return pcollectionImpl; } public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) { - return ptype.getDefaultFileSource(createTempPath()); + return ptype.getDefaultFileSource(createTempPath()); } public Path createTempPath() { tempFileIndex++; return new Path(tempDirectory, "p" + tempFileIndex); } - + private static Path createTempDirectory(Configuration conf) { Path dir = new Path("/tmp/crunch" + RANDOM.nextInt()); - try { - FileSystem.get(conf).mkdirs(dir); - } catch (IOException e) { - LOG.error("Exception creating job output directory", e); - throw new RuntimeException(e); - } + try { + FileSystem.get(conf).mkdirs(dir); + } catch (IOException e) { + LOG.error("Exception creating job output directory", e); + throw new RuntimeException(e); + } return dir; } - + @Override public <T> void writeTextFile(PCollection<T> pcollection, String pathName) { // Ensure that this is a writable pcollection instance. - pcollection = pcollection.parallelDo("asText", IdentityFn.<T>getInstance(), - WritableTypeFamily.getInstance().as(pcollection.getPType())); + pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(), WritableTypeFamily + .getInstance().as(pcollection.getPType())); write(pcollection, At.textFile(pathName)); } @@ -256,7 +291,7 @@ public class MRPipeline implements Pipeline { LOG.info("Exception during cleanup", e); } } - + public int getNextAnonymousStageId() { return nextAnonymousStageId++; } @@ -265,7 +300,7 @@ public class MRPipeline implements Pipeline { public void enableDebug() { // Turn on Crunch runtime error catching. getConfiguration().setBoolean(RuntimeParameters.DEBUG, true); - + // Write Hadoop's WARN logs to the console. Logger crunchInfoLogger = LogManager.getLogger("com.cloudera.crunch"); Appender console = crunchInfoLogger.getAppender("A"); @@ -277,9 +312,9 @@ public class MRPipeline implements Pipeline { LOG.warn("Could not find console appender named 'A' for writing Hadoop warning logs"); } } - + @Override public String getName() { - return name; + return name; } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/6d701c13/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java b/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java new file mode 100644 index 0000000..f265460 --- /dev/null +++ b/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java @@ -0,0 +1,60 @@ +package com.cloudera.crunch.impl.mr; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.junit.Before; +import org.junit.Test; + +import com.cloudera.crunch.SourceTarget; +import com.cloudera.crunch.impl.mr.collect.PCollectionImpl; +import com.cloudera.crunch.io.ReadableSourceTarget; +import com.cloudera.crunch.types.avro.Avros; + +public class MRPipelineTest { + + private MRPipeline pipeline; + + @Before + public void setUp() throws IOException { + pipeline = spy(new MRPipeline(MRPipelineTest.class)); + } + + @Test + public void testGetMaterializeSourceTarget_AlreadyMaterialized() { + PCollectionImpl<String> materializedPcollection = mock(PCollectionImpl.class); + ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class); + when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget); + + assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(materializedPcollection)); + } + + @Test + public void testGetMaterializeSourceTarget_NotMaterialized_HasOutput() { + + PCollectionImpl<String> pcollection = mock(PCollectionImpl.class); + ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class); + when(pcollection.getPType()).thenReturn(Avros.strings()); + doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); + when(pcollection.getMaterializedAt()).thenReturn(null); + + assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(pcollection)); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() { + PCollectionImpl<String> pcollection = mock(PCollectionImpl.class); + SourceTarget<String> nonReadableSourceTarget = mock(SourceTarget.class); + when(pcollection.getPType()).thenReturn(Avros.strings()); + doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); + when(pcollection.getMaterializedAt()).thenReturn(null); + + pipeline.getMaterializeSourceTarget(pcollection); + } + +}
