Updated Branches: refs/heads/master 8c725ac79 -> 0eb69c43b
CRUNCH-101: Improve design and documentation. Move tool.CrunchTool to util, leaving the tool package empty. Move util.PTypes and util.Protos to the types package. Remove the unused util.Collects class. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/0eb69c43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/0eb69c43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/0eb69c43 Branch: refs/heads/master Commit: 0eb69c43b525502e6cff3ec79fe68b31e36b37d3 Parents: 8c725ac Author: Matthias Friedrich <[email protected]> Authored: Sun Oct 21 11:10:31 2012 +0200 Committer: Matthias Friedrich <[email protected]> Committed: Mon Oct 22 19:57:42 2012 +0200 ---------------------------------------------------------------------- .../src/it/java/org/apache/crunch/EnumPairIT.java | 2 +- .../src/it/java/org/apache/crunch/PageRankIT.java | 2 +- .../java/org/apache/crunch/tool/CrunchTool.java | 103 ------- .../main/java/org/apache/crunch/types/PTypes.java | 231 ++++++++++++++ .../main/java/org/apache/crunch/types/Protos.java | 173 +++++++++++ .../java/org/apache/crunch/types/avro/Avros.java | 2 +- .../java/org/apache/crunch/types/package-info.java | 22 ++ .../apache/crunch/types/writable/Writables.java | 2 +- .../main/java/org/apache/crunch/util/Collects.java | 49 --- .../java/org/apache/crunch/util/CrunchTool.java | 103 +++++++ .../main/java/org/apache/crunch/util/PTypes.java | 233 --------------- .../main/java/org/apache/crunch/util/Protos.java | 173 ----------- .../java/org/apache/crunch/util/package-info.java | 22 ++ pom.xml | 2 +- 14 files changed, 556 insertions(+), 563 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/it/java/org/apache/crunch/EnumPairIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/EnumPairIT.java b/crunch/src/it/java/org/apache/crunch/EnumPairIT.java index aa4f0c4..1d0974e 100644 --- a/crunch/src/it/java/org/apache/crunch/EnumPairIT.java +++ b/crunch/src/it/java/org/apache/crunch/EnumPairIT.java @@ -25,8 +25,8 @@ import java.io.Serializable; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PTypes; import org.apache.crunch.types.writable.Writables; -import org.apache.crunch.util.PTypes; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/it/java/org/apache/crunch/PageRankIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/PageRankIT.java b/crunch/src/it/java/org/apache/crunch/PageRankIT.java index e45bbc7..5a555b3 100644 --- a/crunch/src/it/java/org/apache/crunch/PageRankIT.java +++ b/crunch/src/it/java/org/apache/crunch/PageRankIT.java @@ -30,10 +30,10 @@ import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.PTypes; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableTypeFamily; -import org.apache.crunch.util.PTypes; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java b/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java deleted file mode 100644 index 6b8a174..0000000 --- a/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.tool; - -import java.io.IOException; - -import org.apache.crunch.PCollection; -import org.apache.crunch.PTable; -import org.apache.crunch.Pipeline; -import org.apache.crunch.Source; -import org.apache.crunch.TableSource; -import org.apache.crunch.Target; -import org.apache.crunch.impl.mem.MemPipeline; -import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.io.At; -import org.apache.crunch.io.From; -import org.apache.crunch.io.To; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.Tool; - -/** - * An extension of the {@code Tool} interface that creates a {@code Pipeline} - * instance and provides methods for working with the Pipeline from inside of - * the Tool's run method. - * - */ -public abstract class CrunchTool extends Configured implements Tool { - - protected static final From from = new From(); - protected static final To to = new To(); - protected static final At at = new At(); - - private Pipeline pipeline; - - public CrunchTool() throws IOException { - this(false); - } - - public CrunchTool(boolean inMemory) { - this.pipeline = inMemory ? MemPipeline.getInstance() : new MRPipeline(getClass()); - } - - @Override - public void setConf(Configuration conf) { - super.setConf(conf); - if (conf != null && pipeline != null) { - pipeline.setConfiguration(conf); - } - } - - @Override - public Configuration getConf() { - return pipeline.getConfiguration(); - } - - public void enableDebug() { - pipeline.enableDebug(); - } - - public <T> PCollection<T> read(Source<T> source) { - return pipeline.read(source); - } - - public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) { - return pipeline.read(tableSource); - } - - public PCollection<String> readTextFile(String pathName) { - return pipeline.readTextFile(pathName); - } - - public void write(PCollection<?> pcollection, Target target) { - pipeline.write(pcollection, target); - } - - public void writeTextFile(PCollection<?> pcollection, String pathName) { - pipeline.writeTextFile(pcollection, pathName); - } - - public void run() { - pipeline.run(); - } - - public void done() { - pipeline.done(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/types/PTypes.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypes.java b/crunch/src/main/java/org/apache/crunch/types/PTypes.java new file mode 100644 index 0000000..ea9450e --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/types/PTypes.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import org.apache.crunch.MapFn; +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.codehaus.jackson.map.ObjectMapper; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +/** + * Utility functions for creating common types of derived PTypes, e.g., for JSON + * data, protocol buffers, and Thrift records. + * + */ +public class PTypes { + + public static PType<BigInteger> bigInt(PTypeFamily typeFamily) { + return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes()); + } + + public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) { + return typeFamily + .derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings()); + } + + public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) { + return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes()); + } + + public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) { + return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes()); + } + + public static final <T extends Enum> PType<T> enums(final Class<T> type, PTypeFamily typeFamily) { + return typeFamily.derived(type, new EnumInputMapper<T>(type), new EnumOutputMapper<T>(), typeFamily.strings()); + } + + public static MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new MapFn<ByteBuffer, BigInteger>() { + public BigInteger map(ByteBuffer input) { + return input == null ? null : new BigInteger(input.array()); + } + }; + + public static MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new MapFn<BigInteger, ByteBuffer>() { + public ByteBuffer map(BigInteger input) { + return input == null ? null : ByteBuffer.wrap(input.toByteArray()); + } + }; + + public static class JacksonInputMapFn<T> extends MapFn<String, T> { + + private final Class<T> clazz; + private transient ObjectMapper mapper; + + public JacksonInputMapFn(Class<T> clazz) { + this.clazz = clazz; + } + + @Override + public void initialize() { + this.mapper = new ObjectMapper(); + } + + @Override + public T map(String input) { + try { + return mapper.readValue(input, clazz); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + + public static class JacksonOutputMapFn<T> extends MapFn<T, String> { + + private transient ObjectMapper mapper; + + @Override + public void initialize() { + this.mapper = new ObjectMapper(); + } + + @Override + public String map(T input) { + try { + return mapper.writeValueAsString(input); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + + public static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> { + + private final Class<T> clazz; + private transient T instance; + + public ProtoInputMapFn(Class<T> clazz) { + this.clazz = clazz; + } + + @Override + public void initialize() { + this.instance = Protos.getDefaultInstance(clazz); + } + + @Override + public T map(ByteBuffer bb) { + try { + return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit()).build(); + } catch (InvalidProtocolBufferException e) { + throw new CrunchRuntimeException(e); + } + } + } + + public static class ProtoOutputMapFn<T extends Message> extends MapFn<T, ByteBuffer> { + + public ProtoOutputMapFn() { + } + + @Override + public ByteBuffer map(T proto) { + return ByteBuffer.wrap(proto.toByteArray()); + } + } + + public static class ThriftInputMapFn<T extends TBase> extends MapFn<ByteBuffer, T> { + + private final Class<T> clazz; + private transient T instance; + private transient TDeserializer deserializer; + private transient byte[] bytes; + + public ThriftInputMapFn(Class<T> clazz) { + this.clazz = clazz; + } + + @Override + public void initialize() { + this.instance = ReflectionUtils.newInstance(clazz, getConfiguration()); + this.deserializer = new TDeserializer(new TBinaryProtocol.Factory()); + this.bytes = new byte[0]; + } + + @Override + public T map(ByteBuffer bb) { + T next = (T) instance.deepCopy(); + int len = bb.limit() - bb.position(); + if (len != bytes.length) { + bytes = new byte[len]; + } + System.arraycopy(bb.array(), bb.position(), bytes, 0, len); + try { + deserializer.deserialize(next, bytes); + } catch (TException e) { + throw new CrunchRuntimeException(e); + } + return next; + } + } + + public static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, ByteBuffer> { + + private transient TSerializer serializer; + + public ThriftOutputMapFn() { + } + + @Override + public void initialize() { + this.serializer = new TSerializer(new TBinaryProtocol.Factory()); + } + + @Override + public ByteBuffer map(T t) { + try { + return ByteBuffer.wrap(serializer.serialize(t)); + } catch (TException e) { + throw new CrunchRuntimeException(e); + } + } + } + + public static class EnumInputMapper<T extends Enum> extends MapFn<String, T> { + private final Class<T> type; + + public EnumInputMapper(Class<T> type) { + this.type = type; + } + + @Override + public T map(String input) { + return (T) Enum.valueOf(type, input); + } + }; + + public static class EnumOutputMapper<T extends Enum> extends MapFn<T, String> { + + @Override + public String map(T input) { + return input.name(); + } + }; +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/types/Protos.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/Protos.java b/crunch/src/main/java/org/apache/crunch/types/Protos.java new file mode 100644 index 0000000..1672209 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/types/Protos.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types; + +import java.util.Iterator; +import java.util.List; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.base.Splitter; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; + +/** + * Utility functions for working with protocol buffers in Crunch. + */ +public class Protos { + + /** + * Utility function for creating a default PB Messgae from a Class object that + * works with both protoc 2.3.0 and 2.4.x. + * @param clazz The class of the protocol buffer to create + * @return An instance of a protocol buffer + */ + public static <M extends Message> M getDefaultInstance(Class<M> clazz) { + if (clazz.getConstructors().length > 0) { + // Protobuf 2.3.0 + return ReflectionUtils.newInstance(clazz, null); + } else { + // Protobuf 2.4.x + try { + Message.Builder mb = (Message.Builder) clazz.getDeclaredMethod("newBuilder").invoke(null); + return (M) mb.getDefaultInstanceForType(); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + + public static <M extends Message, K> MapFn<M, K> extractKey(String fieldName) { + return new ExtractKeyFn<M, K>(fieldName); + } + + public static <M extends Message> DoFn<String, M> lineParser(String sep, Class<M> msgClass) { + return new TextToProtoFn<M>(sep, msgClass); + } + + public static class ExtractKeyFn<M extends Message, K> extends MapFn<M, K> { + + private final String fieldName; + + private transient FieldDescriptor fd; + + public ExtractKeyFn(String fieldName) { + this.fieldName = fieldName; + } + + @Override + public K map(M input) { + if (input == null) { + throw new IllegalArgumentException("Null inputs not supported by Protos.ExtractKeyFn"); + } else if (fd == null) { + fd = input.getDescriptorForType().findFieldByName(fieldName); + if (fd == null) { + throw new IllegalStateException("Could not find field: " + fieldName + " in message: " + input); + } + } + return (K) input.getField(fd); + } + + } + + public static class TextToProtoFn<M extends Message> extends DoFn<String, M> { + + private final String sep; + private final Class<M> msgClass; + + private transient M msgInstance; + private transient List<FieldDescriptor> fields; + private transient Splitter splitter; + + enum ParseErrors { + TOTAL, + NUMBER_FORMAT + }; + + public TextToProtoFn(String sep, Class<M> msgClass) { + this.sep = sep; + this.msgClass = msgClass; + } + + @Override + public void initialize() { + this.msgInstance = getDefaultInstance(msgClass); + this.fields = msgInstance.getDescriptorForType().getFields(); + this.splitter = Splitter.on(sep); + } + + @Override + public void process(String input, Emitter<M> emitter) { + if (input != null && !input.isEmpty()) { + Builder b = msgInstance.newBuilderForType(); + Iterator<String> iter = splitter.split(input).iterator(); + boolean parseError = false; + for (FieldDescriptor fd : fields) { + if (iter.hasNext()) { + String value = iter.next(); + if (value != null && !value.isEmpty()) { + Object parsedValue = null; + try { + switch (fd.getJavaType()) { + case STRING: + parsedValue = value; + break; + case INT: + parsedValue = Integer.valueOf(value); + break; + case LONG: + parsedValue = Long.valueOf(value); + break; + case FLOAT: + parsedValue = Float.valueOf(value); + break; + case DOUBLE: + parsedValue = Double.valueOf(value); + break; + case BOOLEAN: + parsedValue = Boolean.valueOf(value); + break; + case ENUM: + parsedValue = fd.getEnumType().findValueByName(value); + break; + } + b.setField(fd, parsedValue); + } catch (NumberFormatException nfe) { + increment(ParseErrors.NUMBER_FORMAT); + parseError = true; + break; + } + } + } + } + + if (parseError) { + increment(ParseErrors.TOTAL); + } else { + emitter.emit((M) b.build()); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java index 4a83db5..655ee55 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -50,10 +50,10 @@ import org.apache.crunch.types.DeepCopier; import org.apache.crunch.types.MapDeepCopier; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypes; import org.apache.crunch.types.TupleDeepCopier; import org.apache.crunch.types.TupleFactory; import org.apache.crunch.types.writable.WritableDeepCopier; -import org.apache.crunch.util.PTypes; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/types/package-info.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/package-info.java b/crunch/src/main/java/org/apache/crunch/types/package-info.java new file mode 100644 index 0000000..b420b03 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/types/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Common functionality for business object serialization. + */ +package org.apache.crunch.types; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java index 5e305b8..67f0621 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -34,9 +34,9 @@ import org.apache.crunch.types.CollectionDeepCopier; import org.apache.crunch.types.DeepCopier; import org.apache.crunch.types.MapDeepCopier; import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypes; import org.apache.crunch.types.TupleDeepCopier; import org.apache.crunch.types.TupleFactory; -import org.apache.crunch.util.PTypes; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/util/Collects.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/Collects.java b/crunch/src/main/java/org/apache/crunch/util/Collects.java deleted file mode 100644 index c8c9311..0000000 --- a/crunch/src/main/java/org/apache/crunch/util/Collects.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.util; - -import java.util.Collection; -import java.util.Iterator; - -import com.google.common.collect.Lists; - -/** - * Utility functions for returning Collection objects backed by different types - * of implementations. - */ -public class Collects { - - public static <T> Collection<T> newArrayList() { - return Lists.newArrayList(); - } - - public static <T> Collection<T> newArrayList(T... elements) { - return Lists.newArrayList(elements); - } - - public static <T> Collection<T> newArrayList(Iterable<? extends T> elements) { - return Lists.newArrayList(elements); - } - - public static <T> Collection<T> newArrayList(Iterator<? extends T> elements) { - return Lists.newArrayList(elements); - } - - private Collects() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java b/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java new file mode 100644 index 0000000..a2075da --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.util; + +import java.io.IOException; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Source; +import org.apache.crunch.TableSource; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.Tool; + +/** + * An extension of the {@code Tool} interface that creates a {@code Pipeline} + * instance and provides methods for working with the Pipeline from inside of + * the Tool's run method. + * + */ +public abstract class CrunchTool extends Configured implements Tool { + + protected static final From from = new From(); + protected static final To to = new To(); + protected static final At at = new At(); + + private Pipeline pipeline; + + public CrunchTool() throws IOException { + this(false); + } + + public CrunchTool(boolean inMemory) { + this.pipeline = inMemory ? MemPipeline.getInstance() : new MRPipeline(getClass()); + } + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + if (conf != null && pipeline != null) { + pipeline.setConfiguration(conf); + } + } + + @Override + public Configuration getConf() { + return pipeline.getConfiguration(); + } + + public void enableDebug() { + pipeline.enableDebug(); + } + + public <T> PCollection<T> read(Source<T> source) { + return pipeline.read(source); + } + + public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) { + return pipeline.read(tableSource); + } + + public PCollection<String> readTextFile(String pathName) { + return pipeline.readTextFile(pathName); + } + + public void write(PCollection<?> pcollection, Target target) { + pipeline.write(pcollection, target); + } + + public void writeTextFile(PCollection<?> pcollection, String pathName) { + pipeline.writeTextFile(pcollection, pathName); + } + + public void run() { + pipeline.run(); + } + + public void done() { + pipeline.done(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/util/PTypes.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/PTypes.java b/crunch/src/main/java/org/apache/crunch/util/PTypes.java deleted file mode 100644 index 23c0d08..0000000 --- a/crunch/src/main/java/org/apache/crunch/util/PTypes.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.util; - -import java.math.BigInteger; -import java.nio.ByteBuffer; - -import org.apache.crunch.MapFn; -import org.apache.crunch.impl.mr.run.CrunchRuntimeException; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.PTypeFamily; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.thrift.TBase; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.codehaus.jackson.map.ObjectMapper; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; - -/** - * Utility functions for creating common types of derived PTypes, e.g., for JSON - * data, protocol buffers, and Thrift records. - * - */ -public class PTypes { - - public static PType<BigInteger> bigInt(PTypeFamily typeFamily) { - return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes()); - } - - public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) { - return typeFamily - .derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings()); - } - - public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) { - return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes()); - } - - public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) { - return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes()); - } - - public static final <T extends Enum> PType<T> enums(final Class<T> type, PTypeFamily typeFamily) { - return typeFamily.derived(type, new EnumInputMapper<T>(type), new EnumOutputMapper<T>(), typeFamily.strings()); - } - - public static MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new MapFn<ByteBuffer, BigInteger>() { - public BigInteger map(ByteBuffer input) { - return input == null ? null : new BigInteger(input.array()); - } - }; - - public static MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new MapFn<BigInteger, ByteBuffer>() { - public ByteBuffer map(BigInteger input) { - return input == null ? null : ByteBuffer.wrap(input.toByteArray()); - } - }; - - public static class JacksonInputMapFn<T> extends MapFn<String, T> { - - private final Class<T> clazz; - private transient ObjectMapper mapper; - - public JacksonInputMapFn(Class<T> clazz) { - this.clazz = clazz; - } - - @Override - public void initialize() { - this.mapper = new ObjectMapper(); - } - - @Override - public T map(String input) { - try { - return mapper.readValue(input, clazz); - } catch (Exception e) { - throw new CrunchRuntimeException(e); - } - } - } - - public static class JacksonOutputMapFn<T> extends MapFn<T, String> { - - private transient ObjectMapper mapper; - - @Override - public void initialize() { - this.mapper = new ObjectMapper(); - } - - @Override - public String map(T input) { - try { - return mapper.writeValueAsString(input); - } catch (Exception e) { - throw new CrunchRuntimeException(e); - } - } - } - - public static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> { - - private final Class<T> clazz; - private transient T instance; - - public ProtoInputMapFn(Class<T> clazz) { - this.clazz = clazz; - } - - @Override - public void initialize() { - this.instance = Protos.getDefaultInstance(clazz); - } - - @Override - public T map(ByteBuffer bb) { - try { - return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit()).build(); - } catch (InvalidProtocolBufferException e) { - throw new CrunchRuntimeException(e); - } - } - } - - public static class ProtoOutputMapFn<T extends Message> extends MapFn<T, ByteBuffer> { - - public ProtoOutputMapFn() { - } - - @Override - public ByteBuffer map(T proto) { - return ByteBuffer.wrap(proto.toByteArray()); - } - } - - public static class ThriftInputMapFn<T extends TBase> extends MapFn<ByteBuffer, T> { - - private final Class<T> clazz; - private transient T instance; - private transient TDeserializer deserializer; - private transient byte[] bytes; - - public ThriftInputMapFn(Class<T> clazz) { - this.clazz = clazz; - } - - @Override - public void initialize() { - this.instance = ReflectionUtils.newInstance(clazz, getConfiguration()); - this.deserializer = new TDeserializer(new TBinaryProtocol.Factory()); - this.bytes = new byte[0]; - } - - @Override - public T map(ByteBuffer bb) { - T next = (T) instance.deepCopy(); - int len = bb.limit() - bb.position(); - if (len != bytes.length) { - bytes = new byte[len]; - } - System.arraycopy(bb.array(), bb.position(), bytes, 0, len); - try { - deserializer.deserialize(next, bytes); - } catch (TException e) { - throw new CrunchRuntimeException(e); - } - return next; - } - } - - public static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, ByteBuffer> { - - private transient TSerializer serializer; - - public ThriftOutputMapFn() { - } - - @Override - public void initialize() { - this.serializer = new TSerializer(new TBinaryProtocol.Factory()); - } - - @Override - public ByteBuffer map(T t) { - try { - return ByteBuffer.wrap(serializer.serialize(t)); - } catch (TException e) { - throw new CrunchRuntimeException(e); - } - } - } - - public static class EnumInputMapper<T extends Enum> extends MapFn<String, T> { - private final Class<T> type; - - public EnumInputMapper(Class<T> type) { - this.type = type; - } - - @Override - public T map(String input) { - return (T) Enum.valueOf(type, input); - } - }; - - public static class EnumOutputMapper<T extends Enum> extends MapFn<T, String> { - - @Override - public String map(T input) { - return input.name(); - } - }; -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/util/Protos.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/Protos.java b/crunch/src/main/java/org/apache/crunch/util/Protos.java deleted file mode 100644 index fc61ffa..0000000 --- a/crunch/src/main/java/org/apache/crunch/util/Protos.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.util; - -import java.util.Iterator; -import java.util.List; - -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; -import org.apache.crunch.MapFn; -import org.apache.crunch.impl.mr.run.CrunchRuntimeException; -import org.apache.hadoop.util.ReflectionUtils; - -import com.google.common.base.Splitter; -import com.google.protobuf.Descriptors.FieldDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.Message.Builder; - -/** - * Utility functions for working with protocol buffers in Crunch. - */ -public class Protos { - - /** - * Utility function for creating a default PB Messgae from a Class object that - * works with both protoc 2.3.0 and 2.4.x. - * @param clazz The class of the protocol buffer to create - * @return An instance of a protocol buffer - */ - public static <M extends Message> M getDefaultInstance(Class<M> clazz) { - if (clazz.getConstructors().length > 0) { - // Protobuf 2.3.0 - return ReflectionUtils.newInstance(clazz, null); - } else { - // Protobuf 2.4.x - try { - Message.Builder mb = (Message.Builder) clazz.getDeclaredMethod("newBuilder").invoke(null); - return (M) mb.getDefaultInstanceForType(); - } catch (Exception e) { - throw new CrunchRuntimeException(e); - } - } - } - - public static <M extends Message, K> MapFn<M, K> extractKey(String fieldName) { - return new ExtractKeyFn<M, K>(fieldName); - } - - public static <M extends Message> DoFn<String, M> lineParser(String sep, Class<M> msgClass) { - return new TextToProtoFn<M>(sep, msgClass); - } - - public static class ExtractKeyFn<M extends Message, K> extends MapFn<M, K> { - - private final String fieldName; - - private transient FieldDescriptor fd; - - public ExtractKeyFn(String fieldName) { - this.fieldName = fieldName; - } - - @Override - public K map(M input) { - if (input == null) { - throw new IllegalArgumentException("Null inputs not supported by Protos.ExtractKeyFn"); - } else if (fd == null) { - fd = input.getDescriptorForType().findFieldByName(fieldName); - if (fd == null) { - throw new IllegalStateException("Could not find field: " + fieldName + " in message: " + input); - } - } - return (K) input.getField(fd); - } - - } - - public static class TextToProtoFn<M extends Message> extends DoFn<String, M> { - - private final String sep; - private final Class<M> msgClass; - - private transient M msgInstance; - private transient List<FieldDescriptor> fields; - private transient Splitter splitter; - - enum ParseErrors { - TOTAL, - NUMBER_FORMAT - }; - - public TextToProtoFn(String sep, Class<M> msgClass) { - this.sep = sep; - this.msgClass = msgClass; - } - - @Override - public void initialize() { - this.msgInstance = getDefaultInstance(msgClass); - this.fields = msgInstance.getDescriptorForType().getFields(); - this.splitter = Splitter.on(sep); - } - - @Override - public void process(String input, Emitter<M> emitter) { - if (input != null && !input.isEmpty()) { - Builder b = msgInstance.newBuilderForType(); - Iterator<String> iter = splitter.split(input).iterator(); - boolean parseError = false; - for (FieldDescriptor fd : fields) { - if (iter.hasNext()) { - String value = iter.next(); - if (value != null && !value.isEmpty()) { - Object parsedValue = null; - try { - switch (fd.getJavaType()) { - case STRING: - parsedValue = value; - break; - case INT: - parsedValue = Integer.valueOf(value); - break; - case LONG: - parsedValue = Long.valueOf(value); - break; - case FLOAT: - parsedValue = Float.valueOf(value); - break; - case DOUBLE: - parsedValue = Double.valueOf(value); - break; - case BOOLEAN: - parsedValue = Boolean.valueOf(value); - break; - case ENUM: - parsedValue = fd.getEnumType().findValueByName(value); - break; - } - b.setField(fd, parsedValue); - } catch (NumberFormatException nfe) { - increment(ParseErrors.NUMBER_FORMAT); - parseError = true; - break; - } - } - } - } - - if (parseError) { - increment(ParseErrors.TOTAL); - } else { - emitter.emit((M) b.build()); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/util/package-info.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/package-info.java b/crunch/src/main/java/org/apache/crunch/util/package-info.java new file mode 100644 index 0000000..94d79a1 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/util/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * An assorted set of utilities. + */ +package org.apache.crunch.util; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 06c5218..882ada1 100644 --- a/pom.xml +++ b/pom.xml @@ -564,7 +564,7 @@ under the License. <groups> <group> <title>Core</title> - <packages>${pkg}:${pkg}.fn:${pkg}.impl.*:${pkg}.io:${pkg}.types.*:${pkg}.test</packages> + <packages>${pkg}:${pkg}.fn:${pkg}.impl.*:${pkg}.io:${pkg}.types:${pkg}.types.*:${pkg}.test:${pkg}.util</packages> </group> <group> <title>Extension Library</title>
