Updated Branches:
  refs/heads/master 8c725ac79 -> 0eb69c43b

CRUNCH-101: Improve design and documentation.

Move tool.CrunchTool to util, leaving the tool package empty.
Move util.PTypes and util.Protos to the types package.
Remove the unused util.Collects class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/0eb69c43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/0eb69c43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/0eb69c43

Branch: refs/heads/master
Commit: 0eb69c43b525502e6cff3ec79fe68b31e36b37d3
Parents: 8c725ac
Author: Matthias Friedrich <[email protected]>
Authored: Sun Oct 21 11:10:31 2012 +0200
Committer: Matthias Friedrich <[email protected]>
Committed: Mon Oct 22 19:57:42 2012 +0200

----------------------------------------------------------------------
 .../src/it/java/org/apache/crunch/EnumPairIT.java  |    2 +-
 .../src/it/java/org/apache/crunch/PageRankIT.java  |    2 +-
 .../java/org/apache/crunch/tool/CrunchTool.java    |  103 -------
 .../main/java/org/apache/crunch/types/PTypes.java  |  231 ++++++++++++++
 .../main/java/org/apache/crunch/types/Protos.java  |  173 +++++++++++
 .../java/org/apache/crunch/types/avro/Avros.java   |    2 +-
 .../java/org/apache/crunch/types/package-info.java |   22 ++
 .../apache/crunch/types/writable/Writables.java    |    2 +-
 .../main/java/org/apache/crunch/util/Collects.java |   49 ---
 .../java/org/apache/crunch/util/CrunchTool.java    |  103 +++++++
 .../main/java/org/apache/crunch/util/PTypes.java   |  233 ---------------
 .../main/java/org/apache/crunch/util/Protos.java   |  173 -----------
 .../java/org/apache/crunch/util/package-info.java  |   22 ++
 pom.xml                                            |    2 +-
 14 files changed, 556 insertions(+), 563 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/it/java/org/apache/crunch/EnumPairIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/EnumPairIT.java 
b/crunch/src/it/java/org/apache/crunch/EnumPairIT.java
index aa4f0c4..1d0974e 100644
--- a/crunch/src/it/java/org/apache/crunch/EnumPairIT.java
+++ b/crunch/src/it/java/org/apache/crunch/EnumPairIT.java
@@ -25,8 +25,8 @@ import java.io.Serializable;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.writable.Writables;
-import org.apache.crunch.util.PTypes;
 import org.junit.Rule;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PageRankIT.java 
b/crunch/src/it/java/org/apache/crunch/PageRankIT.java
index e45bbc7..5a555b3 100644
--- a/crunch/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch/src/it/java/org/apache/crunch/PageRankIT.java
@@ -30,10 +30,10 @@ import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.crunch.util.PTypes;
 import org.junit.Rule;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java 
b/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java
deleted file mode 100644
index 6b8a174..0000000
--- a/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.tool;
-
-import java.io.IOException;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.Source;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.Target;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.io.From;
-import org.apache.crunch.io.To;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.Tool;
-
-/**
- * An extension of the {@code Tool} interface that creates a {@code Pipeline}
- * instance and provides methods for working with the Pipeline from inside of
- * the Tool's run method.
- * 
- */
-public abstract class CrunchTool extends Configured implements Tool {
-
-  protected static final From from = new From();
-  protected static final To to = new To();
-  protected static final At at = new At();
-
-  private Pipeline pipeline;
-
-  public CrunchTool() throws IOException {
-    this(false);
-  }
-
-  public CrunchTool(boolean inMemory) {
-    this.pipeline = inMemory ? MemPipeline.getInstance() : new 
MRPipeline(getClass());
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    super.setConf(conf);
-    if (conf != null && pipeline != null) {
-      pipeline.setConfiguration(conf);
-    }
-  }
-
-  @Override
-  public Configuration getConf() {
-    return pipeline.getConfiguration();
-  }
-
-  public void enableDebug() {
-    pipeline.enableDebug();
-  }
-
-  public <T> PCollection<T> read(Source<T> source) {
-    return pipeline.read(source);
-  }
-
-  public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) {
-    return pipeline.read(tableSource);
-  }
-
-  public PCollection<String> readTextFile(String pathName) {
-    return pipeline.readTextFile(pathName);
-  }
-
-  public void write(PCollection<?> pcollection, Target target) {
-    pipeline.write(pcollection, target);
-  }
-
-  public void writeTextFile(PCollection<?> pcollection, String pathName) {
-    pipeline.writeTextFile(pcollection, pathName);
-  }
-
-  public void run() {
-    pipeline.run();
-  }
-
-  public void done() {
-    pipeline.done();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/types/PTypes.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypes.java 
b/crunch/src/main/java/org/apache/crunch/types/PTypes.java
new file mode 100644
index 0000000..ea9450e
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/PTypes.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+/**
+ * Utility functions for creating common types of derived PTypes, e.g., for 
JSON
+ * data, protocol buffers, and Thrift records.
+ * 
+ */
+public class PTypes {
+
+  public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
+    return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, 
BIGINT_TO_BYTE, typeFamily.bytes());
+  }
+
+  public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily 
typeFamily) {
+    return typeFamily
+        .derived(clazz, new JacksonInputMapFn<T>(clazz), new 
JacksonOutputMapFn<T>(), typeFamily.strings());
+  }
+
+  public static <T extends Message> PType<T> protos(Class<T> clazz, 
PTypeFamily typeFamily) {
+    return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new 
ProtoOutputMapFn<T>(), typeFamily.bytes());
+  }
+
+  public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily 
typeFamily) {
+    return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new 
ThriftOutputMapFn<T>(), typeFamily.bytes());
+  }
+
+  public static final <T extends Enum> PType<T> enums(final Class<T> type, 
PTypeFamily typeFamily) {
+    return typeFamily.derived(type, new EnumInputMapper<T>(type), new 
EnumOutputMapper<T>(), typeFamily.strings());
+  }
+
+  public static MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new 
MapFn<ByteBuffer, BigInteger>() {
+    public BigInteger map(ByteBuffer input) {
+      return input == null ? null : new BigInteger(input.array());
+    }
+  };
+
+  public static MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new 
MapFn<BigInteger, ByteBuffer>() {
+    public ByteBuffer map(BigInteger input) {
+      return input == null ? null : ByteBuffer.wrap(input.toByteArray());
+    }
+  };
+
+  public static class JacksonInputMapFn<T> extends MapFn<String, T> {
+
+    private final Class<T> clazz;
+    private transient ObjectMapper mapper;
+
+    public JacksonInputMapFn(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+
+    @Override
+    public void initialize() {
+      this.mapper = new ObjectMapper();
+    }
+
+    @Override
+    public T map(String input) {
+      try {
+        return mapper.readValue(input, clazz);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  public static class JacksonOutputMapFn<T> extends MapFn<T, String> {
+
+    private transient ObjectMapper mapper;
+
+    @Override
+    public void initialize() {
+      this.mapper = new ObjectMapper();
+    }
+
+    @Override
+    public String map(T input) {
+      try {
+        return mapper.writeValueAsString(input);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  public static class ProtoInputMapFn<T extends Message> extends 
MapFn<ByteBuffer, T> {
+
+    private final Class<T> clazz;
+    private transient T instance;
+
+    public ProtoInputMapFn(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+
+    @Override
+    public void initialize() {
+      this.instance = Protos.getDefaultInstance(clazz);
+    }
+
+    @Override
+    public T map(ByteBuffer bb) {
+      try {
+        return (T) instance.newBuilderForType().mergeFrom(bb.array(), 
bb.position(), bb.limit()).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  public static class ProtoOutputMapFn<T extends Message> extends MapFn<T, 
ByteBuffer> {
+
+    public ProtoOutputMapFn() {
+    }
+
+    @Override
+    public ByteBuffer map(T proto) {
+      return ByteBuffer.wrap(proto.toByteArray());
+    }
+  }
+
+  public static class ThriftInputMapFn<T extends TBase> extends 
MapFn<ByteBuffer, T> {
+
+    private final Class<T> clazz;
+    private transient T instance;
+    private transient TDeserializer deserializer;
+    private transient byte[] bytes;
+
+    public ThriftInputMapFn(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+
+    @Override
+    public void initialize() {
+      this.instance = ReflectionUtils.newInstance(clazz, getConfiguration());
+      this.deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+      this.bytes = new byte[0];
+    }
+
+    @Override
+    public T map(ByteBuffer bb) {
+      T next = (T) instance.deepCopy();
+      int len = bb.limit() - bb.position();
+      if (len != bytes.length) {
+        bytes = new byte[len];
+      }
+      System.arraycopy(bb.array(), bb.position(), bytes, 0, len);
+      try {
+        deserializer.deserialize(next, bytes);
+      } catch (TException e) {
+        throw new CrunchRuntimeException(e);
+      }
+      return next;
+    }
+  }
+
+  public static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, 
ByteBuffer> {
+
+    private transient TSerializer serializer;
+
+    public ThriftOutputMapFn() {
+    }
+
+    @Override
+    public void initialize() {
+      this.serializer = new TSerializer(new TBinaryProtocol.Factory());
+    }
+
+    @Override
+    public ByteBuffer map(T t) {
+      try {
+        return ByteBuffer.wrap(serializer.serialize(t));
+      } catch (TException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  public static class EnumInputMapper<T extends Enum> extends MapFn<String, T> 
{
+    private final Class<T> type;
+
+    public EnumInputMapper(Class<T> type) {
+      this.type = type;
+    }
+
+    @Override
+    public T map(String input) {
+      return (T) Enum.valueOf(type, input);
+    }
+  };
+
+  public static class EnumOutputMapper<T extends Enum> extends MapFn<T, 
String> {
+
+    @Override
+    public String map(T input) {
+      return input.name();
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/types/Protos.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/Protos.java 
b/crunch/src/main/java/org/apache/crunch/types/Protos.java
new file mode 100644
index 0000000..1672209
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/Protos.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Splitter;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+
+/**
+ * Utility functions for working with protocol buffers in Crunch.
+ */
+public class Protos {
+
+  /**
+   * Utility function for creating a default PB Messgae from a Class object 
that
+   * works with both protoc 2.3.0 and 2.4.x.
+   * @param clazz The class of the protocol buffer to create
+   * @return An instance of a protocol buffer
+   */
+  public static <M extends Message> M getDefaultInstance(Class<M> clazz) {
+    if (clazz.getConstructors().length > 0) {
+      // Protobuf 2.3.0
+      return ReflectionUtils.newInstance(clazz, null);
+    } else {
+      // Protobuf 2.4.x
+      try {
+        Message.Builder mb = (Message.Builder) 
clazz.getDeclaredMethod("newBuilder").invoke(null);
+        return (M) mb.getDefaultInstanceForType();
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }  
+    }
+  }
+  
+  public static <M extends Message, K> MapFn<M, K> extractKey(String 
fieldName) {
+    return new ExtractKeyFn<M, K>(fieldName);
+  }
+
+  public static <M extends Message> DoFn<String, M> lineParser(String sep, 
Class<M> msgClass) {
+    return new TextToProtoFn<M>(sep, msgClass);
+  }
+
+  public static class ExtractKeyFn<M extends Message, K> extends MapFn<M, K> {
+
+    private final String fieldName;
+
+    private transient FieldDescriptor fd;
+
+    public ExtractKeyFn(String fieldName) {
+      this.fieldName = fieldName;
+    }
+
+    @Override
+    public K map(M input) {
+      if (input == null) {
+        throw new IllegalArgumentException("Null inputs not supported by 
Protos.ExtractKeyFn");
+      } else if (fd == null) {
+        fd = input.getDescriptorForType().findFieldByName(fieldName);
+        if (fd == null) {
+          throw new IllegalStateException("Could not find field: " + fieldName 
+ " in message: " + input);
+        }
+      }
+      return (K) input.getField(fd);
+    }
+
+  }
+
+  public static class TextToProtoFn<M extends Message> extends DoFn<String, M> 
{
+
+    private final String sep;
+    private final Class<M> msgClass;
+
+    private transient M msgInstance;
+    private transient List<FieldDescriptor> fields;
+    private transient Splitter splitter;
+
+    enum ParseErrors {
+      TOTAL,
+      NUMBER_FORMAT
+    };
+
+    public TextToProtoFn(String sep, Class<M> msgClass) {
+      this.sep = sep;
+      this.msgClass = msgClass;
+    }
+
+    @Override
+    public void initialize() {
+      this.msgInstance = getDefaultInstance(msgClass);
+      this.fields = msgInstance.getDescriptorForType().getFields();
+      this.splitter = Splitter.on(sep);
+    }
+
+    @Override
+    public void process(String input, Emitter<M> emitter) {
+      if (input != null && !input.isEmpty()) {
+        Builder b = msgInstance.newBuilderForType();
+        Iterator<String> iter = splitter.split(input).iterator();
+        boolean parseError = false;
+        for (FieldDescriptor fd : fields) {
+          if (iter.hasNext()) {
+            String value = iter.next();
+            if (value != null && !value.isEmpty()) {
+              Object parsedValue = null;
+              try {
+                switch (fd.getJavaType()) {
+                case STRING:
+                  parsedValue = value;
+                  break;
+                case INT:
+                  parsedValue = Integer.valueOf(value);
+                  break;
+                case LONG:
+                  parsedValue = Long.valueOf(value);
+                  break;
+                case FLOAT:
+                  parsedValue = Float.valueOf(value);
+                  break;
+                case DOUBLE:
+                  parsedValue = Double.valueOf(value);
+                  break;
+                case BOOLEAN:
+                  parsedValue = Boolean.valueOf(value);
+                  break;
+                case ENUM:
+                  parsedValue = fd.getEnumType().findValueByName(value);
+                  break;
+                }
+                b.setField(fd, parsedValue);
+              } catch (NumberFormatException nfe) {
+                increment(ParseErrors.NUMBER_FORMAT);
+                parseError = true;
+                break;
+              }
+            }
+          }
+        }
+
+        if (parseError) {
+          increment(ParseErrors.TOTAL);
+        } else {
+          emitter.emit((M) b.build());
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java 
b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index 4a83db5..655ee55 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -50,10 +50,10 @@ import org.apache.crunch.types.DeepCopier;
 import org.apache.crunch.types.MapDeepCopier;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.TupleDeepCopier;
 import org.apache.crunch.types.TupleFactory;
 import org.apache.crunch.types.writable.WritableDeepCopier;
-import org.apache.crunch.util.PTypes;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/types/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/package-info.java 
b/crunch/src/main/java/org/apache/crunch/types/package-info.java
new file mode 100644
index 0000000..b420b03
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common functionality for business object serialization.
+ */
+package org.apache.crunch.types;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java 
b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
index 5e305b8..67f0621 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -34,9 +34,9 @@ import org.apache.crunch.types.CollectionDeepCopier;
 import org.apache.crunch.types.DeepCopier;
 import org.apache.crunch.types.MapDeepCopier;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.TupleDeepCopier;
 import org.apache.crunch.types.TupleFactory;
-import org.apache.crunch.util.PTypes;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/util/Collects.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/Collects.java 
b/crunch/src/main/java/org/apache/crunch/util/Collects.java
deleted file mode 100644
index c8c9311..0000000
--- a/crunch/src/main/java/org/apache/crunch/util/Collects.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.util;
-
-import java.util.Collection;
-import java.util.Iterator;
-
-import com.google.common.collect.Lists;
-
-/**
- * Utility functions for returning Collection objects backed by different types
- * of implementations.
- */
-public class Collects {
-
-  public static <T> Collection<T> newArrayList() {
-    return Lists.newArrayList();
-  }
-
-  public static <T> Collection<T> newArrayList(T... elements) {
-    return Lists.newArrayList(elements);
-  }
-
-  public static <T> Collection<T> newArrayList(Iterable<? extends T> elements) 
{
-    return Lists.newArrayList(elements);
-  }
-
-  public static <T> Collection<T> newArrayList(Iterator<? extends T> elements) 
{
-    return Lists.newArrayList(elements);
-  }
-
-  private Collects() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java 
b/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java
new file mode 100644
index 0000000..a2075da
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import java.io.IOException;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+
+/**
+ * An extension of the {@code Tool} interface that creates a {@code Pipeline}
+ * instance and provides methods for working with the Pipeline from inside of
+ * the Tool's run method.
+ * 
+ */
+public abstract class CrunchTool extends Configured implements Tool {
+
+  protected static final From from = new From();
+  protected static final To to = new To();
+  protected static final At at = new At();
+
+  private Pipeline pipeline;
+
+  public CrunchTool() throws IOException {
+    this(false);
+  }
+
+  public CrunchTool(boolean inMemory) {
+    this.pipeline = inMemory ? MemPipeline.getInstance() : new 
MRPipeline(getClass());
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (conf != null && pipeline != null) {
+      pipeline.setConfiguration(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return pipeline.getConfiguration();
+  }
+
+  public void enableDebug() {
+    pipeline.enableDebug();
+  }
+
+  public <T> PCollection<T> read(Source<T> source) {
+    return pipeline.read(source);
+  }
+
+  public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) {
+    return pipeline.read(tableSource);
+  }
+
+  public PCollection<String> readTextFile(String pathName) {
+    return pipeline.readTextFile(pathName);
+  }
+
+  public void write(PCollection<?> pcollection, Target target) {
+    pipeline.write(pcollection, target);
+  }
+
+  public void writeTextFile(PCollection<?> pcollection, String pathName) {
+    pipeline.writeTextFile(pcollection, pathName);
+  }
+
+  public void run() {
+    pipeline.run();
+  }
+
+  public void done() {
+    pipeline.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/util/PTypes.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/PTypes.java 
b/crunch/src/main/java/org/apache/crunch/util/PTypes.java
deleted file mode 100644
index 23c0d08..0000000
--- a/crunch/src/main/java/org/apache/crunch/util/PTypes.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.util;
-
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-/**
- * Utility functions for creating common types of derived PTypes, e.g., for 
JSON
- * data, protocol buffers, and Thrift records.
- * 
- */
-public class PTypes {
-
-  public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
-    return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, 
BIGINT_TO_BYTE, typeFamily.bytes());
-  }
-
-  public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily 
typeFamily) {
-    return typeFamily
-        .derived(clazz, new JacksonInputMapFn<T>(clazz), new 
JacksonOutputMapFn<T>(), typeFamily.strings());
-  }
-
-  public static <T extends Message> PType<T> protos(Class<T> clazz, 
PTypeFamily typeFamily) {
-    return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new 
ProtoOutputMapFn<T>(), typeFamily.bytes());
-  }
-
-  public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily 
typeFamily) {
-    return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new 
ThriftOutputMapFn<T>(), typeFamily.bytes());
-  }
-
-  public static final <T extends Enum> PType<T> enums(final Class<T> type, 
PTypeFamily typeFamily) {
-    return typeFamily.derived(type, new EnumInputMapper<T>(type), new 
EnumOutputMapper<T>(), typeFamily.strings());
-  }
-
-  public static MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new 
MapFn<ByteBuffer, BigInteger>() {
-    public BigInteger map(ByteBuffer input) {
-      return input == null ? null : new BigInteger(input.array());
-    }
-  };
-
-  public static MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new 
MapFn<BigInteger, ByteBuffer>() {
-    public ByteBuffer map(BigInteger input) {
-      return input == null ? null : ByteBuffer.wrap(input.toByteArray());
-    }
-  };
-
-  public static class JacksonInputMapFn<T> extends MapFn<String, T> {
-
-    private final Class<T> clazz;
-    private transient ObjectMapper mapper;
-
-    public JacksonInputMapFn(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-
-    @Override
-    public void initialize() {
-      this.mapper = new ObjectMapper();
-    }
-
-    @Override
-    public T map(String input) {
-      try {
-        return mapper.readValue(input, clazz);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  public static class JacksonOutputMapFn<T> extends MapFn<T, String> {
-
-    private transient ObjectMapper mapper;
-
-    @Override
-    public void initialize() {
-      this.mapper = new ObjectMapper();
-    }
-
-    @Override
-    public String map(T input) {
-      try {
-        return mapper.writeValueAsString(input);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  public static class ProtoInputMapFn<T extends Message> extends 
MapFn<ByteBuffer, T> {
-
-    private final Class<T> clazz;
-    private transient T instance;
-
-    public ProtoInputMapFn(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-
-    @Override
-    public void initialize() {
-      this.instance = Protos.getDefaultInstance(clazz);
-    }
-
-    @Override
-    public T map(ByteBuffer bb) {
-      try {
-        return (T) instance.newBuilderForType().mergeFrom(bb.array(), 
bb.position(), bb.limit()).build();
-      } catch (InvalidProtocolBufferException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  public static class ProtoOutputMapFn<T extends Message> extends MapFn<T, 
ByteBuffer> {
-
-    public ProtoOutputMapFn() {
-    }
-
-    @Override
-    public ByteBuffer map(T proto) {
-      return ByteBuffer.wrap(proto.toByteArray());
-    }
-  }
-
-  public static class ThriftInputMapFn<T extends TBase> extends 
MapFn<ByteBuffer, T> {
-
-    private final Class<T> clazz;
-    private transient T instance;
-    private transient TDeserializer deserializer;
-    private transient byte[] bytes;
-
-    public ThriftInputMapFn(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-
-    @Override
-    public void initialize() {
-      this.instance = ReflectionUtils.newInstance(clazz, getConfiguration());
-      this.deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-      this.bytes = new byte[0];
-    }
-
-    @Override
-    public T map(ByteBuffer bb) {
-      T next = (T) instance.deepCopy();
-      int len = bb.limit() - bb.position();
-      if (len != bytes.length) {
-        bytes = new byte[len];
-      }
-      System.arraycopy(bb.array(), bb.position(), bytes, 0, len);
-      try {
-        deserializer.deserialize(next, bytes);
-      } catch (TException e) {
-        throw new CrunchRuntimeException(e);
-      }
-      return next;
-    }
-  }
-
-  public static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, 
ByteBuffer> {
-
-    private transient TSerializer serializer;
-
-    public ThriftOutputMapFn() {
-    }
-
-    @Override
-    public void initialize() {
-      this.serializer = new TSerializer(new TBinaryProtocol.Factory());
-    }
-
-    @Override
-    public ByteBuffer map(T t) {
-      try {
-        return ByteBuffer.wrap(serializer.serialize(t));
-      } catch (TException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  public static class EnumInputMapper<T extends Enum> extends MapFn<String, T> 
{
-    private final Class<T> type;
-
-    public EnumInputMapper(Class<T> type) {
-      this.type = type;
-    }
-
-    @Override
-    public T map(String input) {
-      return (T) Enum.valueOf(type, input);
-    }
-  };
-
-  public static class EnumOutputMapper<T extends Enum> extends MapFn<T, 
String> {
-
-    @Override
-    public String map(T input) {
-      return input.name();
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/util/Protos.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/Protos.java 
b/crunch/src/main/java/org/apache/crunch/util/Protos.java
deleted file mode 100644
index fc61ffa..0000000
--- a/crunch/src/main/java/org/apache/crunch/util/Protos.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.util;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.base.Splitter;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
-
-/**
- * Utility functions for working with protocol buffers in Crunch.
- */
-public class Protos {
-
-  /**
-   * Utility function for creating a default PB Messgae from a Class object 
that
-   * works with both protoc 2.3.0 and 2.4.x.
-   * @param clazz The class of the protocol buffer to create
-   * @return An instance of a protocol buffer
-   */
-  public static <M extends Message> M getDefaultInstance(Class<M> clazz) {
-    if (clazz.getConstructors().length > 0) {
-      // Protobuf 2.3.0
-      return ReflectionUtils.newInstance(clazz, null);
-    } else {
-      // Protobuf 2.4.x
-      try {
-        Message.Builder mb = (Message.Builder) 
clazz.getDeclaredMethod("newBuilder").invoke(null);
-        return (M) mb.getDefaultInstanceForType();
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }  
-    }
-  }
-  
-  public static <M extends Message, K> MapFn<M, K> extractKey(String 
fieldName) {
-    return new ExtractKeyFn<M, K>(fieldName);
-  }
-
-  public static <M extends Message> DoFn<String, M> lineParser(String sep, 
Class<M> msgClass) {
-    return new TextToProtoFn<M>(sep, msgClass);
-  }
-
-  public static class ExtractKeyFn<M extends Message, K> extends MapFn<M, K> {
-
-    private final String fieldName;
-
-    private transient FieldDescriptor fd;
-
-    public ExtractKeyFn(String fieldName) {
-      this.fieldName = fieldName;
-    }
-
-    @Override
-    public K map(M input) {
-      if (input == null) {
-        throw new IllegalArgumentException("Null inputs not supported by 
Protos.ExtractKeyFn");
-      } else if (fd == null) {
-        fd = input.getDescriptorForType().findFieldByName(fieldName);
-        if (fd == null) {
-          throw new IllegalStateException("Could not find field: " + fieldName 
+ " in message: " + input);
-        }
-      }
-      return (K) input.getField(fd);
-    }
-
-  }
-
-  public static class TextToProtoFn<M extends Message> extends DoFn<String, M> 
{
-
-    private final String sep;
-    private final Class<M> msgClass;
-
-    private transient M msgInstance;
-    private transient List<FieldDescriptor> fields;
-    private transient Splitter splitter;
-
-    enum ParseErrors {
-      TOTAL,
-      NUMBER_FORMAT
-    };
-
-    public TextToProtoFn(String sep, Class<M> msgClass) {
-      this.sep = sep;
-      this.msgClass = msgClass;
-    }
-
-    @Override
-    public void initialize() {
-      this.msgInstance = getDefaultInstance(msgClass);
-      this.fields = msgInstance.getDescriptorForType().getFields();
-      this.splitter = Splitter.on(sep);
-    }
-
-    @Override
-    public void process(String input, Emitter<M> emitter) {
-      if (input != null && !input.isEmpty()) {
-        Builder b = msgInstance.newBuilderForType();
-        Iterator<String> iter = splitter.split(input).iterator();
-        boolean parseError = false;
-        for (FieldDescriptor fd : fields) {
-          if (iter.hasNext()) {
-            String value = iter.next();
-            if (value != null && !value.isEmpty()) {
-              Object parsedValue = null;
-              try {
-                switch (fd.getJavaType()) {
-                case STRING:
-                  parsedValue = value;
-                  break;
-                case INT:
-                  parsedValue = Integer.valueOf(value);
-                  break;
-                case LONG:
-                  parsedValue = Long.valueOf(value);
-                  break;
-                case FLOAT:
-                  parsedValue = Float.valueOf(value);
-                  break;
-                case DOUBLE:
-                  parsedValue = Double.valueOf(value);
-                  break;
-                case BOOLEAN:
-                  parsedValue = Boolean.valueOf(value);
-                  break;
-                case ENUM:
-                  parsedValue = fd.getEnumType().findValueByName(value);
-                  break;
-                }
-                b.setField(fd, parsedValue);
-              } catch (NumberFormatException nfe) {
-                increment(ParseErrors.NUMBER_FORMAT);
-                parseError = true;
-                break;
-              }
-            }
-          }
-        }
-
-        if (parseError) {
-          increment(ParseErrors.TOTAL);
-        } else {
-          emitter.emit((M) b.build());
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/crunch/src/main/java/org/apache/crunch/util/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/package-info.java 
b/crunch/src/main/java/org/apache/crunch/util/package-info.java
new file mode 100644
index 0000000..94d79a1
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/util/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * An assorted set of utilities.
+ */
+package org.apache.crunch.util;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0eb69c43/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 06c5218..882ada1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -564,7 +564,7 @@ under the License.
           <groups>
             <group>
               <title>Core</title>
-              
<packages>${pkg}:${pkg}.fn:${pkg}.impl.*:${pkg}.io:${pkg}.types.*:${pkg}.test</packages>
+              
<packages>${pkg}:${pkg}.fn:${pkg}.impl.*:${pkg}.io:${pkg}.types:${pkg}.types.*:${pkg}.test:${pkg}.util</packages>
             </group>
             <group>
               <title>Extension Library</title>

Reply via email to