Updated Branches: refs/heads/master 5cf8142ae -> 9a1c42760
CRUNCH-144: Convert Targets to SourceTargets wherever possible to mark a PCollection as materialized, and update the rules on converting a text file target to SourceTargets. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/9a1c4276 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/9a1c4276 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/9a1c4276 Branch: refs/heads/master Commit: 9a1c42760b5c01a5d1dbdf6ee6bdc4f9fb8ca086 Parents: 5cf8142 Author: Josh Wills <[email protected]> Authored: Wed Jan 16 07:42:10 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Fri Jan 25 15:47:17 2013 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/impl/mr/MRPipeline.java | 14 ++++++-- .../org/apache/crunch/io/text/TextFileTarget.java | 29 +++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/9a1c4276/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index d9545f8..c71ef23 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -162,9 +162,17 @@ public class MRPipeline implements Pipeline { } else { boolean materialized = false; for (Target t : outputTargets.get(c)) { - if (!materialized && t instanceof Source) { - c.materializeAt((SourceTarget) t); - materialized = true; + if (!materialized) { + if (t instanceof SourceTarget) { + c.materializeAt((SourceTarget) t); + materialized = true; + } else { + SourceTarget st = t.asSourceTarget(c.getPType()); + if (st != null) { + c.materializeAt(st); + materialized = true; + } + } } } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/9a1c4276/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java index ec7d521..0c3e6a4 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.io.text; +import org.apache.avro.Schema; import org.apache.crunch.SourceTarget; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.SequentialFileNamingScheme; @@ -25,8 +26,12 @@ import org.apache.crunch.types.Converter; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.avro.AvroTextOutputFormat; +import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableType; +import org.apache.crunch.types.writable.WritableTypeFamily; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; @@ -72,9 +77,33 @@ public class TextFileTarget extends FileTargetImpl { @Override public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { + if (!isTextCompatible(ptype)) { + return null; + } if (ptype instanceof PTableType) { return new TextFileTableSourceTarget(path, (PTableType) ptype); } return new TextFileSourceTarget<T>(path, ptype); } + + private <T> boolean isTextCompatible(PType<T> ptype) { + if (AvroTypeFamily.getInstance().equals(ptype.getFamily())) { + AvroType<T> at = (AvroType<T>) ptype; + if (at.getSchema().equals(Schema.create(Schema.Type.STRING))) { + return true; + } + } else if (WritableTypeFamily.getInstance().equals(ptype.getFamily())) { + if (ptype instanceof PTableType) { + PTableType ptt = (PTableType) ptype; + return isText(ptt.getKeyType()) && isText(ptt.getValueType()); + } else { + return isText(ptype); + } + } + return false; + } + + private <T> boolean isText(PType<T> wtype) { + return Text.class.equals(((WritableType) wtype).getSerializationClass()); + } }
