Updated Branches: refs/heads/master b3d023871 -> f1640ceb2
CRUNCh-147: Have writeTextFile force all inputs to be Strings and adjust tests for this Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/f1640ceb Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/f1640ceb Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/f1640ceb Branch: refs/heads/master Commit: f1640ceb2e5e56593dd17ce4cc110c284785ca61 Parents: b3d0238 Author: Josh Wills <[email protected]> Authored: Tue Jan 22 11:06:29 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Wed Jan 23 07:58:31 2013 -0800 ---------------------------------------------------------------------- crunch/src/it/java/org/apache/crunch/TfIdfIT.java | 4 +- .../src/it/java/org/apache/crunch/WordCountIT.java | 2 +- .../it/java/org/apache/crunch/lib/CogroupIT.java | 3 +- .../java/org/apache/crunch/impl/mr/MRPipeline.java | 20 +++++++++----- 4 files changed, 17 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1640ceb/crunch/src/it/java/org/apache/crunch/TfIdfIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch/src/it/java/org/apache/crunch/TfIdfIT.java index 0c39c4b..218f538 100644 --- a/crunch/src/it/java/org/apache/crunch/TfIdfIT.java +++ b/crunch/src/it/java/org/apache/crunch/TfIdfIT.java @@ -202,7 +202,7 @@ public class TfIdfIT implements Serializable { List<String> lines = Files.readLines(outputFile, Charset.defaultCharset()); boolean passed = false; for (String line : lines) { - if (line.startsWith("the") && line.contains("B,0.6931471805599453")) { + if (line.startsWith("[the") && line.contains("B,0.6931471805599453")) { passed = true; break; } @@ -214,7 +214,7 @@ public class TfIdfIT implements Serializable { lines = Files.readLines(outputFile, Charset.defaultCharset()); passed = false; for (String line : lines) { - if (line.startsWith("THE") && line.contains("B,0.6931471805599453")) { + if (line.startsWith("[THE") && line.contains("B,0.6931471805599453")) { passed = true; break; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1640ceb/crunch/src/it/java/org/apache/crunch/WordCountIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/WordCountIT.java b/crunch/src/it/java/org/apache/crunch/WordCountIT.java index f46a1ee..c646663 100644 --- a/crunch/src/it/java/org/apache/crunch/WordCountIT.java +++ b/crunch/src/it/java/org/apache/crunch/WordCountIT.java @@ -161,7 +161,7 @@ public class WordCountIT { List<String> lines = Files.readLines(outputFile, Charset.defaultCharset()); boolean passed = false; for (String line : lines) { - if (line.startsWith("Macbeth\t28")) { + if (line.startsWith("Macbeth\t28") || line.startsWith("[Macbeth,28]")) { passed = true; break; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1640ceb/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java index 2bdc9ef..99950a4 100644 --- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java +++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java @@ -25,7 +25,6 @@ import java.nio.charset.Charset; import java.util.Collection; import java.util.List; -import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; @@ -117,7 +116,7 @@ public class CogroupIT { List<String> lines = Files.readLines(outputFile, Charset.defaultCharset()); boolean passed = false; for (String line : lines) { - if (line.equals("j\t705")) { + if (line.equals("[j,705]")) { passed = true; break; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1640ceb/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 4d8fa84..d9545f8 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -44,11 +44,12 @@ import org.apache.crunch.impl.mr.collect.UnionTable; import org.apache.crunch.impl.mr.exec.MRExecutor; import org.apache.crunch.impl.mr.plan.MSCRPlanner; import org.apache.crunch.impl.mr.run.RuntimeParameters; -import org.apache.crunch.io.At; +import org.apache.crunch.io.From; import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.crunch.io.To; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.crunch.types.PType; -import org.apache.crunch.types.writable.WritableTypeFamily; +import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -191,7 +192,7 @@ public class MRPipeline implements Pipeline { } public PCollection<String> readTextFile(String pathName) { - return read(At.textFile(pathName)); + return read(From.textFile(pathName)); } @SuppressWarnings("unchecked") @@ -311,12 +312,17 @@ public class MRPipeline implements Pipeline { @Override public <T> void writeTextFile(PCollection<T> pcollection, String pathName) { - // Ensure that this is a writable pcollection instance. - pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(), WritableTypeFamily - .getInstance().as(pcollection.getPType())); - write(pcollection, At.textFile(pathName)); + pcollection.parallelDo("asText", new StringifyFn<T>(), Writables.strings()) + .write(To.textFile(pathName)); } + private static class StringifyFn<T> extends MapFn<T, String> { + @Override + public String map(T input) { + return input.toString(); + } + } + private void cleanup() { if (!outputTargets.isEmpty()) { LOG.warn("Not running cleanup while output targets remain");
