Updated Branches: refs/heads/master af54c920e -> dd9456b92
CRUNCH-21: Make temporary dir configurable. Make temporary directory configurable via "crunch.tmp.dir". Fix logging anti-pattern: don't log and throw exception. Fix unit test that left data behind, cleaning up warnings, too. Signed-off-by: jwills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/dd9456b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/dd9456b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/dd9456b9 Branch: refs/heads/master Commit: dd9456b924a1af8aed50039ef71b1afcf10cbd07 Parents: af54c92 Author: Matthias Friedrich <[email protected]> Authored: Sat Jul 21 11:01:34 2012 +0200 Committer: jwills <[email protected]> Committed: Sat Jul 21 13:01:03 2012 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/impl/mr/MRPipeline.java | 10 +++- .../crunch/impl/mr/run/RuntimeParameters.java | 2 + .../org/apache/crunch/impl/mr/MRPipelineTest.java | 34 +++++++++----- 3 files changed, 31 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd9456b9/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 37e2083..29a1963 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -264,16 +264,20 @@ public class MRPipeline implements Pipeline { } private static Path createTempDirectory(Configuration conf) { - Path dir = new Path("/tmp/crunch" + RANDOM.nextInt()); + Path dir = createTemporaryPath(conf); try { FileSystem.get(conf).mkdirs(dir); } catch (IOException e) { - LOG.error("Exception creating job output directory", e); - throw new RuntimeException(e); + throw new RuntimeException("Cannot create job output directory " + dir, e); } return dir; } + private static Path createTemporaryPath(Configuration conf) { + String baseDir = conf.get(RuntimeParameters.TMP_DIR, "/tmp"); + return new Path(baseDir, "crunch-" + (RANDOM.nextInt() & Integer.MAX_VALUE)); + } + @Override public <T> void writeTextFile(PCollection<T> pcollection, String pathName) { // Ensure that this is a writable pcollection instance. http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd9456b9/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java index f16752f..1dcabb3 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -29,6 +29,8 @@ public class RuntimeParameters { public static final String DEBUG = "crunch.debug"; + public static final String TMP_DIR = "crunch.tmp.dir"; + // Not instantiated private RuntimeParameters() { } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd9456b9/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java index f010755..9ed7a46 100644 --- a/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java +++ b/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java @@ -19,7 +19,6 @@ package org.apache.crunch.impl.mr; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -27,34 +26,47 @@ import java.io.IOException; import org.apache.crunch.SourceTarget; import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.ReadableSourceTarget; import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; -public class MRPipelineTest { +@RunWith(MockitoJUnitRunner.class) +public class MRPipelineTest { + @Rule + public TemporaryFolder tempDir = new TemporaryFolder(); + @Mock + private PCollectionImpl<String> pcollection; + @Mock + private ReadableSourceTarget<String> readableSourceTarget; + @Mock + private SourceTarget<String> nonReadableSourceTarget; private MRPipeline pipeline; @Before public void setUp() throws IOException { - pipeline = spy(new MRPipeline(MRPipelineTest.class)); + Configuration conf = new Configuration(); + conf.set(RuntimeParameters.TMP_DIR, tempDir.getRoot().getAbsolutePath()); + pipeline = spy(new MRPipeline(MRPipelineTest.class, conf)); } @Test public void testGetMaterializeSourceTarget_AlreadyMaterialized() { - PCollectionImpl<String> materializedPcollection = mock(PCollectionImpl.class); - ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class); - when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget); + when(pcollection.getMaterializedAt()).thenReturn(readableSourceTarget); - assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(materializedPcollection)); + assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(pcollection)); } @Test public void testGetMaterializeSourceTarget_NotMaterialized_HasOutput() { - - PCollectionImpl<String> pcollection = mock(PCollectionImpl.class); - ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class); when(pcollection.getPType()).thenReturn(Avros.strings()); doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); when(pcollection.getMaterializedAt()).thenReturn(null); @@ -64,8 +76,6 @@ public class MRPipelineTest { @Test(expected = IllegalArgumentException.class) public void testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() { - PCollectionImpl<String> pcollection = mock(PCollectionImpl.class); - SourceTarget<String> nonReadableSourceTarget = mock(SourceTarget.class); when(pcollection.getPType()).thenReturn(Avros.strings()); doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); when(pcollection.getMaterializedAt()).thenReturn(null);
