Updated Branches: refs/heads/master e74fc23e1 -> 4b45e134f
Have MemPipeline write outputs to directories to be consistent with MRPipeline Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/4b45e134 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/4b45e134 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/4b45e134 Branch: refs/heads/master Commit: 4b45e134fc28d3d2a83bb1db7c00487f724d54dc Parents: e74fc23 Author: Josh Wills <[email protected]> Authored: Thu Jun 21 08:31:01 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Jun 21 08:31:01 2012 -0700 ---------------------------------------------------------------------- .../com/cloudera/crunch/impl/mem/MemPipeline.java | 3 +- .../impl/mem/MemPipelineFileWritingTest.java | 50 +++++++++++++++ 2 files changed, 52 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4b45e134/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java b/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java index 7242ca5..2b21efc 100644 --- a/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java +++ b/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -145,7 +146,7 @@ public class MemPipeline implements Pipeline { Path path = ((PathTarget) target).getPath(); try { FileSystem fs = FileSystem.get(conf); - FSDataOutputStream os = fs.create(path); + FSDataOutputStream os = fs.create(new Path(path, "out")); if (collection instanceof PTable) { for (Object o : collection.materialize()) { Pair p = (Pair) o; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4b45e134/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java b/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java new file mode 100644 index 0000000..9051fc1 --- /dev/null +++ b/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.crunch.impl.mem; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.List; + +import com.cloudera.crunch.PCollection; +import com.cloudera.crunch.Pipeline; +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; + +import org.junit.Test; + +public class MemPipelineFileWritingTest { + @Test + public void testMemPipelineFileWriter() throws Exception { + File tmpDir = Files.createTempDir(); + tmpDir.delete(); + Pipeline p = MemPipeline.getInstance(); + PCollection<String> lines = MemPipeline.collectionOf("hello", "world"); + p.writeTextFile(lines, tmpDir.getAbsolutePath()); + p.done(); + assertTrue(tmpDir.exists()); + File[] files = tmpDir.listFiles(); + assertTrue(files != null && files.length > 0); + for (File f : files) { + if (!f.getName().startsWith(".")) { + List<String> txt = Files.readLines(f, Charsets.UTF_8); + assertEquals(ImmutableList.of("hello", "world"), txt); + } + } + } +}
