Author: cutting
Date: Thu Apr  1 18:30:36 2010
New Revision: 930060

URL: http://svn.apache.org/viewvc?rev=930060&view=rev
Log:
AVRO-493. Add support for Hadoop MapReduce with Avro data files.

Added:
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/
    
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroInputFormat.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
    
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
    
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java
    
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
    
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/FsInput.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
    hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/
    
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
    
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
    
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
    hadoop/avro/trunk/share/test/schemas/WordCount.avsc
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/lang/java/build.xml
    hadoop/avro/trunk/lang/java/ivy.xml

Modified: hadoop/avro/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=930060&r1=930059&r2=930060&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Apr  1 18:30:36 2010
@@ -6,6 +6,8 @@ Avro 1.4.0 (unreleased)
 
   NEW FEATURES
 
+    AVRO-493. Add support for Hadoop Mapreduce with Avro data files. (cutting)
+
   IMPROVEMENTS
 
   BUG FIXES

Modified: hadoop/avro/trunk/lang/java/build.xml
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/build.xml?rev=930060&r1=930059&r2=930060&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/build.xml (original)
+++ hadoop/avro/trunk/lang/java/build.xml Thu Apr  1 18:30:36 2010
@@ -66,6 +66,8 @@
            value="http://jackson.codehaus.org/0.9.3/javadoc/"/>
   <property name="javadoc.link.servlet"
            value="http://java.sun.com/products/servlet/2.3/javadoc/"/>
+  <property name="javadoc.link.hadoop"
+           value="http://hadoop.apache.org/common/docs/current/api/"/>
   <property name="javadoc.packages" value="org.${org}.${name}.*"/>
 
   <property name="javac.encoding" value="ISO-8859-1"/>
@@ -456,6 +458,7 @@
         <link href="${javadoc.link.java}"/>
         <link href="${javadoc.link.jackson}"/>
         <link href="${javadoc.link.servlet}"/>
+        <link href="${javadoc.link.hadoop}"/>
 
         <classpath >
           <path refid="java.classpath" />

Modified: hadoop/avro/trunk/lang/java/ivy.xml
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/ivy.xml?rev=930060&r1=930059&r2=930060&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/ivy.xml (original)
+++ hadoop/avro/trunk/lang/java/ivy.xml Thu Apr  1 18:30:36 2010
@@ -61,6 +61,10 @@
         conf="build->default"/>
     <dependency org="net.sf.jopt-simple" name="jopt-simple" rev="3.2"
         conf="build->default;test->default;tools->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"
+                conf="build->default" transitive="false"/>
+    <dependency org="commons-httpclient" name="commons-httpclient" rev="3.0.1"
+               conf="test->default"/>
   </dependencies>
 
 </ivy-module>

Added: 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroInputFormat.java?rev=930060&view=auto
==============================================================================
--- 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroInputFormat.java
 (added)
+++ 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroInputFormat.java
 Thu Apr  1 18:30:36 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+
+/** An {...@link org.apache.hadoop.mapred.InputFormat} for Avro data files */
+public class AvroInputFormat<T>
+  extends FileInputFormat<AvroWrapper<T>, NullWritable> {
+
+  @Override
+  protected FileStatus[] listStatus(JobConf job) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    for (FileStatus file : super.listStatus(job))
+      if (file.getPath().getName().endsWith(AvroOutputFormat.EXT))
+        result.add(file);
+    return result.toArray(new FileStatus[0]);
+  }
+
+  @Override
+  public RecordReader<AvroWrapper<T>, NullWritable>
+    getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+    throws IOException {
+    reporter.setStatus(split.toString());
+    return new AvroRecordReader<T>(job, (FileSplit)split);
+  }
+
+}
+

Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=930060&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java 
(added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java 
Thu Apr  1 18:30:36 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.avro.Schema;
+
+/** Setters to configure jobs for Avro data. */
+public class AvroJob {
+  private AvroJob() {}                            // no public ctor
+
+  static final String API_GENERIC = "generic";
+  static final String API_SPECIFIC = "specific";
+
+  static final String INPUT_API = "avro.input.api";
+  static final String INPUT_SCHEMA = "avro.input.schema";
+
+  static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
+  static final String MAP_OUTPUT_API = "avro.map.output.api";
+
+  static final String OUTPUT_SCHEMA = "avro.output.schema";
+  static final String OUTPUT_API = "avro.output.api";
+
+  /** Configure a job's map input to use Avro's generic API. */
+  public static void setInputGeneric(JobConf job, Schema s) {
+    job.set(INPUT_API, API_GENERIC);
+    configureAvroInput(job, s);
+  }
+
+  /** Configure a job's map input to use Avro's specific API. */
+  public static void setInputSpecific(JobConf job, Schema s) {
+    job.set(INPUT_API, API_SPECIFIC);
+    configureAvroInput(job, s);
+  }
+
+  private static void configureAvroInput(JobConf job, Schema s) {
+    job.set(INPUT_SCHEMA, s.toString());
+    job.setInputFormat(AvroInputFormat.class);
+  }
+
+  /** Configure a job's map output key schema using Avro's generic API. */
+  public static void setMapOutputGeneric(JobConf job, Schema s) {
+    job.set(MAP_OUTPUT_SCHEMA, s.toString());
+    job.set(MAP_OUTPUT_API, API_GENERIC);
+    configureAvroOutput(job);
+  }
+
+  /** Configure a job's map output key schema using Avro's specific API. */
+  public static void setMapOutputSpecific(JobConf job, Schema s) {
+    job.set(MAP_OUTPUT_SCHEMA, s.toString());
+    job.set(MAP_OUTPUT_API, API_SPECIFIC);
+    configureAvroOutput(job);
+  }
+
+  /** Configure a job's output key schema using Avro's generic API. */
+  public static void setOutputGeneric(JobConf job, Schema s) {
+    job.set(OUTPUT_SCHEMA, s.toString());
+    job.set(OUTPUT_API, API_GENERIC);
+    configureAvroOutput(job);
+  }
+
+  /** Configure a job's output key schema using Avro's specific API. */
+  public static void setOutputSpecific(JobConf job, Schema s) {
+    job.set(OUTPUT_SCHEMA, s.toString());
+    job.set(OUTPUT_API, API_SPECIFIC);
+    configureAvroOutput(job);
+  }
+
+  private static void configureAvroOutput(JobConf job) {
+    job.setOutputKeyClass(AvroWrapper.class);
+    job.setOutputKeyComparatorClass(AvroKeyComparator.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setOutputFormat(AvroOutputFormat.class);
+
+    // add AvroKeySerialization to io.serializations
+    Collection<String> serializations =
+      job.getStringCollection("io.serializations");
+    if (!serializations.contains(AvroKeySerialization.class.getName())) {
+      serializations.add(AvroKeySerialization.class.getName());
+      job.setStrings("io.serializations",
+                     serializations.toArray(new String[0]));
+    }
+  }
+
+}

Added: 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java?rev=930060&view=auto
==============================================================================
--- 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
 (added)
+++ 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
 Thu Apr  1 18:30:36 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
+
+/** The {...@link RawComparator} used by jobs configured with {...@link 
AvroJob}. */
+public class AvroKeyComparator<T>
+  extends Configured implements RawComparator<AvroWrapper<T>> {
+
+  private Schema schema;
+  private GenericData model;
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (conf != null) {
+      schema = Schema.parse(conf.get(AvroJob.MAP_OUTPUT_SCHEMA,
+                                     conf.get(AvroJob.OUTPUT_SCHEMA)));
+      String api = getConf().get(AvroJob.MAP_OUTPUT_API,
+                                 getConf().get(AvroJob.OUTPUT_API));
+      model = AvroJob.API_SPECIFIC.equals(api)
+        ? SpecificData.get()
+        : GenericData.get();
+    }
+  }
+
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    int diff = BinaryData.compare(b1, s1, b2, s2, schema);
+    return diff == 0 ? -1 : diff;
+  }
+
+  public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
+    int diff = model.compare(x.datum(), y.datum(), schema);
+    return diff == 0 ? -1 : diff;
+  }
+
+}

Added: 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java?rev=930060&view=auto
==============================================================================
--- 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
 (added)
+++ 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
 Thu Apr  1 18:30:36 2010
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+/** The {...@link Serialization} used by jobs configured with {...@link 
AvroJob}. */
+public class AvroKeySerialization<T> extends Configured 
+  implements Serialization<AvroWrapper<T>> {
+
+  public boolean accept(Class<?> c) {
+    return AvroWrapper.class.isAssignableFrom(c);
+  }
+  
+  /** Returns the specified map output deserializer.  Defaults to the final
+   * output deserializer if no map output schema was specified. */
+  public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) 
{
+    //  We need not rely on mapred.task.is.map here to determine whether map
+    //  output or final output is desired, since the mapreduce framework never
+    //  creates a deserializer for final output, only for map output.
+    String json = getConf().get(AvroJob.MAP_OUTPUT_SCHEMA,
+                                getConf().get(AvroJob.OUTPUT_SCHEMA));
+    Schema schema = Schema.parse(json);
+
+    String api = getConf().get(AvroJob.MAP_OUTPUT_API,
+                               getConf().get(AvroJob.OUTPUT_API));
+    DatumReader<T> reader = AvroJob.API_SPECIFIC.equals(api)
+      ? new SpecificDatumReader<T>(schema)
+      : new GenericDatumReader<T>(schema);
+
+    return new AvroWrapperDeserializer(reader);
+  }
+  
+  private static final DecoderFactory FACTORY = new DecoderFactory();
+  static { FACTORY.configureDirectDecoder(true); }
+
+  private class AvroWrapperDeserializer
+    implements Deserializer<AvroWrapper<T>> {
+
+    private DatumReader<T> reader;
+    private BinaryDecoder decoder;
+    
+    public AvroWrapperDeserializer(DatumReader<T> reader) {
+      this.reader = reader;
+    }
+    
+    public void open(InputStream in) {
+      this.decoder = FACTORY.createBinaryDecoder(in, decoder);
+    }
+    
+    public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper)
+      throws IOException {
+      T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder);
+      if (wrapper == null) {
+        wrapper = new AvroWrapper<T>(datum);
+      } else {
+        wrapper.datum(datum);
+      }
+      return wrapper;
+    }
+
+    public void close() throws IOException {
+      decoder.inputStream().close();
+    }
+    
+  }
+  
+  /** Returns the specified output serializer. */
+  public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
+    // Here we must rely on mapred.task.is.map to tell whether the map output
+    // or final output is needed.
+    boolean isMap = getConf().getBoolean("mapred.task.is.map", false);
+
+    String json = getConf().get(AvroJob.OUTPUT_SCHEMA);
+    if (isMap) 
+      json = getConf().get(AvroJob.MAP_OUTPUT_SCHEMA, json);
+    Schema schema = Schema.parse(json);
+
+    String api = getConf().get(AvroJob.OUTPUT_API);
+    if (isMap) 
+      api = getConf().get(AvroJob.MAP_OUTPUT_API, json);
+
+    DatumWriter<T> writer = AvroJob.API_SPECIFIC.equals(api)
+      ? new SpecificDatumWriter<T>(schema)
+      : new GenericDatumWriter<T>(schema);
+    return new AvroWrapperSerializer(writer);
+  }
+
+  private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {
+
+    private DatumWriter<T> writer;
+    private OutputStream out;
+    private BinaryEncoder encoder;
+    
+    public AvroWrapperSerializer(DatumWriter<T> writer) {
+      this.writer = writer;
+    }
+
+    public void open(OutputStream out) {
+      this.out = out;
+      this.encoder = new BinaryEncoder(out);
+    }
+
+    public void serialize(AvroWrapper<T> wrapper) throws IOException {
+      writer.write(wrapper.datum(), encoder);
+    }
+
+    public void close() throws IOException {
+      out.close();
+    }
+
+  }
+
+}

Added: 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java?rev=930060&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java 
(added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java 
Thu Apr  1 18:30:36 2010
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Mapper;
+
+/** A {...@link Mapper} for Avro data.
+ *
+ * <p>Applications should subclass this class and pass their subclass to 
{...@link
+ * org.apache.hadoop.mapred.JobConf#setMapperClass(Class)}.  Subclasses must
+ * override {...@link #map} and may call {...@link #collect} to generate 
output.
+ */
+public abstract class AvroMapper<IN,OUT> extends MapReduceBase
+  implements Mapper<AvroWrapper<IN>, NullWritable,
+                    AvroWrapper<OUT>, NullWritable> {
+    
+  private OutputCollector<AvroWrapper<OUT>, NullWritable> out;
+  private Reporter reporter;
+
+  public void map(AvroWrapper<IN> wrapper, NullWritable value, 
+                  OutputCollector<AvroWrapper<OUT>, NullWritable> output, 
+                  Reporter reporter) throws IOException {
+    if (this.out == null) {
+      this.out = output;
+      this.reporter = reporter;
+    }
+    map(wrapper.datum());
+  }
+
+  /** Return the {...@link Reporter} to permit status updates. */
+  public Reporter getReporter() { return reporter; }
+
+  /** Called with each map input datum. */
+  public abstract void map(IN datum) throws IOException;
+
+  private final AvroWrapper<OUT> outputWrapper = new AvroWrapper<OUT>(null);
+
+  /** Call with each map output datum. */
+  public void collect(OUT datum) throws IOException {
+    outputWrapper.datum(datum);
+    out.collect(outputWrapper, NullWritable.get());
+  }
+
+}

Added: 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=930060&view=auto
==============================================================================
--- 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
 (added)
+++ 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
 Thu Apr  1 18:30:36 2010
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.CodecFactory;
+
+/** An {...@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
+public class AvroOutputFormat <T>
+  extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
+
+  final static String EXT = ".avro";
+
+  private static final String DEFLATE_LEVEL_KEY = "avro.mapred.deflate.level";
+  private static final int DEFAULT_DEFLATE_LEVEL = 1;
+
+  /** Enable output compression using the deflate codec and specify its 
level.*/
+  public static void setDeflateLevel(JobConf job, int level) {
+    FileOutputFormat.setCompressOutput(job, true);
+    job.setInt(DEFLATE_LEVEL_KEY, level);
+  }
+
+  public RecordWriter<AvroWrapper<T>, NullWritable>
+    getRecordWriter(FileSystem ignore, JobConf job,
+                    String name, Progressable prog)
+    throws IOException {
+
+    Schema schema = Schema.parse(job.get(AvroJob.OUTPUT_SCHEMA));
+
+    DatumWriter<T> datumWriter =
+      AvroJob.API_SPECIFIC.equals(job.get(AvroJob.OUTPUT_API))
+      ? new SpecificDatumWriter<T>()
+      : new GenericDatumWriter<T>();
+
+    final DataFileWriter<T> writer = new DataFileWriter<T>(datumWriter);
+
+    if (FileOutputFormat.getCompressOutput(job)) {
+      int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
+      writer.setCodec(CodecFactory.deflateCodec(level));
+    }
+
+    Path path = FileOutputFormat.getTaskOutputPath(job, name+EXT);
+    writer.create(schema, path.getFileSystem(job).create(path));
+
+    return new RecordWriter<AvroWrapper<T>, NullWritable>() {
+        public void write(AvroWrapper<T> wrapper, NullWritable ignore)
+          throws IOException {
+          writer.append(wrapper.datum());
+        }
+        public void close(Reporter reporter) throws IOException {
+          writer.close();
+        }
+      };
+  }
+
+}

Added: 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java?rev=930060&view=auto
==============================================================================
--- 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
 (added)
+++ 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
 Thu Apr  1 18:30:36 2010
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+
+/** An {...@link RecordReader} for Avro data files. */
+public class AvroRecordReader<T>
+  implements RecordReader<AvroWrapper<T>, NullWritable> {
+
+  private FsInput in;
+  private DataFileReader<T> reader;
+  private long start;
+  private long end;
+
+  public AvroRecordReader(JobConf job, FileSplit split)
+    throws IOException {
+    this.in = new FsInput(split.getPath(), job);
+    DatumReader<T> datumReader =
+      AvroJob.API_SPECIFIC.equals(job.get(AvroJob.INPUT_API))
+      ? new SpecificDatumReader<T>()
+      : new GenericDatumReader<T>();
+
+    this.reader = new DataFileReader<T>(in, datumReader);
+
+    reader.sync(split.getStart());                    // sync to start
+    this.start = in.tell();
+    this.end = split.getStart() + split.getLength();
+  }
+
+  public AvroWrapper<T> createKey() {
+    return new AvroWrapper<T>(null);
+  }
+  
+  public NullWritable createValue() { return NullWritable.get(); }
+    
+  public boolean next(AvroWrapper<T> wrapper, NullWritable ignore)
+    throws IOException {
+    if (!reader.hasNext() || reader.pastSync(end))
+      return false;
+    wrapper.datum(reader.next(wrapper.datum()));
+    return true;
+  }
+  
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (in.tell() - start) / (float)(end - start));
+    }
+  }
+  
+  public long getPos() throws IOException {
+    return in.tell();
+  }
+
+  public void close() throws IOException { reader.close(); }
+  
+}
+

Added: 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java?rev=930060&view=auto
==============================================================================
--- 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java 
(added)
+++ 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java 
Thu Apr  1 18:30:36 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Reducer;
+
+/** A {...@link Reducer} for Avro data.
+ *
+ * <p>Applications should subclass this class and pass their subclass to 
{...@link
+ * org.apache.hadoop.mapred.JobConf#setReducerClass(Class)} and perhaps 
{...@link
+ * org.apache.hadoop.mapred.JobConf#setCombinerClass(Class)} Subclasses must
+ * override {...@link #reduce} and may call {...@link #collect} to generate 
output.
+ *
+ * <p>Note that reducers here are not passed an iterator of all matching
+ * values.  Rather, the reducer is called with every value.  If values are to
+ * be combined then the reducer must maintain state accordingly.  The final
+ * value may be flushed by overriding {...@link #close} to call {...@link 
#collect}.
+ */
+public abstract class AvroReducer<IN,OUT> extends MapReduceBase
+  implements Reducer<AvroWrapper<IN>, NullWritable,
+                     AvroWrapper<OUT>, NullWritable> {
+    
+  private OutputCollector<AvroWrapper<OUT>, NullWritable> out;
+  private Reporter reporter;
+
+  private final AvroWrapper<OUT> outputWrapper = new AvroWrapper<OUT>(null);
+
+  public void reduce(AvroWrapper<IN> wrapper, Iterator<NullWritable> ignore,
+                     OutputCollector<AvroWrapper<OUT>,NullWritable> output, 
+                     Reporter reporter) throws IOException {
+    if (this.out == null) {
+      this.out = output;
+      this.reporter = reporter;
+    }
+    reduce(wrapper.datum());
+  }
+    
+  /** Return the {...@link Reporter} to permit status updates. */
+  public Reporter getReporter() { return reporter; }
+
+  /** Called with each reduce input datum for this partition, in order. */
+  public abstract void reduce(IN datum) throws IOException;
+
+  /** Call with each final output datum. */
+  public void collect(OUT datum) throws IOException {
+    outputWrapper.datum(datum);
+    out.collect(outputWrapper, NullWritable.get());
+  }
+}

Added: 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java?rev=930060&view=auto
==============================================================================
--- 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java 
(added)
+++ 
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java 
Thu Apr  1 18:30:36 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+/** The wrapper of values for jobs configured with {...@link AvroJob} . */
+public class AvroWrapper<T> {
+  private T datum;
+
+  /** Wrap a value datum. */
+  public AvroWrapper(T datum) { this.datum = datum; }
+
+  /** Return the wrapped datum. */
+  public T datum() { return datum; }
+
+  /** Set the wrapped datum. */
+  public void datum(T datum) { this.datum = datum; }
+  
+  public int hashCode() {
+    return (datum == null) ? 0 : datum.hashCode();
+  }
+
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    AvroWrapper that = (AvroWrapper)obj;
+    if (this.datum == null) {
+      if (that.datum != null)
+        return false;
+    } else if (!datum.equals(that.datum))
+      return false;
+    return true;
+  }
+    
+}

Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/FsInput.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/FsInput.java?rev=930060&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/FsInput.java 
(added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/FsInput.java 
Thu Apr  1 18:30:36 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.avro.file.SeekableInput;
+
+/** Adapt an {...@link FSDataInputStream} to {...@link SeekableInput}. */
+public class FsInput implements Closeable, SeekableInput {
+  private final FSDataInputStream stream;
+  private final long len;
+
+  /** Construct given a path and a configuration. */
+  public FsInput(Path path, Configuration conf) throws IOException {
+    this.stream = path.getFileSystem(conf).open(path);
+    this.len = path.getFileSystem(conf).getFileStatus(path).getLen();
+  }
+
+  public long length() {
+    return len;
+  }
+
+  public int read(byte[] b, int off, int len) throws IOException {
+    return stream.read(b, off, len);
+  }
+
+  public void seek(long p) throws IOException {
+    stream.seek(p);
+  }
+
+  public long tell() throws IOException {
+    return stream.getPos();
+  }
+
+  public void close() throws IOException {
+    stream.close();
+  }
+}

Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html?rev=930060&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html 
(added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html 
Thu Apr  1 18:30:36 2010
@@ -0,0 +1,51 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Tools to permit using Avro data
+with <a href="http://hadoop.apache.org/";>Hadoop</a> MapReduce jobs.
+
+<p>Avro data files do not contain key/value pairs as expected by
+  Hadoop's MapReduce API, but rather just a sequence of values.  Thus
+  we provide here a layer on top of Hadoop's MapReduce API which
+  eliminates the key/value distinction.</p>
+
+<p>To use this for jobs whose input and output are Avro data files:
+ <ul>
+   <li>Subclass {...@link org.apache.avro.mapred.AvroMapper} and specify
+   this as your job's mapper.</li>
+   <li>Subclass {...@link org.apache.avro.mapred.AvroReducer} and specify
+   this as your job's reducer and perhaps combiner.</li>
+   <li>Depending on whether your mapper uses Avro's specific or
+   generic API for inputs, call one of {...@link
+   org.apache.avro.mapred.AvroJob#setInputSpecific} or {...@link
+   org.apache.avro.mapred.AvroJob#setInputGeneric} with your input schema.</li>
+   <li>Depending on whether your job uses Avro's specific or generic
+   API for outputs, call one of {...@link
+   org.apache.avro.mapred.AvroJob#setOutputSpecific} or {...@link
+   org.apache.avro.mapred.AvroJob#setOutputGeneric} with your output
+   schema.</li>
+   <li>Specify input files with {...@link 
org.apache.hadoop.mapred.FileInputFormat#setInputPaths}</li>
+   <li>Specify an output directory with {...@link
+   org.apache.hadoop.mapred.FileOutputFormat#setOutputPath}</li>
+   <li>Run your job with {...@link 
org.apache.hadoop.mapred.JobClient#runJob}</li>
+ </ul>
+</p>
+</body>
+</html>

Added: 
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java?rev=930060&view=auto
==============================================================================
--- 
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
 (added)
+++ 
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
 Thu Apr  1 18:30:36 2010
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import junit.framework.TestCase;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+
+public class TestWordCountGeneric extends TestCase {
+  
+  private static GenericRecord newWordCount(String word, int count) {
+    GenericRecord value = new GenericData.Record(WordCount.SCHEMA$);
+    value.put("word", new Utf8(word));
+    value.put("count", count);
+    return value;
+  }
+
+  public static class MapImpl extends AvroMapper<Utf8, GenericRecord> {
+    public void map(Utf8 text) throws IOException {
+      StringTokenizer tokens = new StringTokenizer(text.toString());
+      while (tokens.hasMoreTokens())
+        collect(newWordCount(tokens.nextToken(), 1));
+    }
+  }
+  
+  public static class ReduceImpl
+    extends AvroReducer<GenericRecord, GenericRecord> {
+
+    private GenericRecord previous;
+
+    public void reduce(GenericRecord current) throws IOException {
+      if (current.equals(previous)) {
+        previous.put("count", ((Integer)previous.get("count"))
+                     + (Integer)current.get("count"));
+      } else {
+        if (previous != null)
+          collect(previous);
+        previous = newWordCount(current.get("word").toString(),
+                                (Integer)current.get("count"));
+      }
+    }
+    
+    public void close() throws IOException {
+      if (previous != null)
+        collect(previous);
+    }
+
+  }
+
+  public void testJob() throws Exception {
+    WordCountUtil.writeLinesFile();
+
+    JobConf job = new JobConf();
+    job.setJobName("wordcount");
+ 
+    AvroJob.setInputGeneric(job, Schema.create(Schema.Type.STRING));
+    AvroJob.setOutputGeneric(job, WordCount.SCHEMA$);
+
+    job.setMapperClass(MapImpl.class);        
+    job.setCombinerClass(ReduceImpl.class);
+    job.setReducerClass(ReduceImpl.class);
+
+    String dir = System.getProperty("test.dir",".")+"/mapred";
+    FileInputFormat.setInputPaths(job, new Path(dir+"/in"));
+    FileOutputFormat.setOutputPath(job, new Path(dir+"/out"));
+    FileOutputFormat.setCompressOutput(job, true);
+
+    JobClient.runJob(job);
+
+    WordCountUtil.validateCountsFile();
+  }
+
+}

Added: 
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java?rev=930060&view=auto
==============================================================================
--- 
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
 (added)
+++ 
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
 Thu Apr  1 18:30:36 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import junit.framework.TestCase;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+
+public class TestWordCountSpecific extends TestCase {
+  
+  private static WordCount newWordCount(String word, int count) {
+    WordCount value = new WordCount();
+    value.word = new Utf8(word);
+    value.count = count;
+    return value;
+  }
+
+  public static class MapImpl extends AvroMapper<Utf8, WordCount> {
+    public void map(Utf8 text) throws IOException {
+      StringTokenizer tokens = new StringTokenizer(text.toString());
+      while (tokens.hasMoreTokens())
+        collect(newWordCount(tokens.nextToken(), 1));
+    }
+  }
+  
+  public static class ReduceImpl extends AvroReducer<WordCount, WordCount> {
+
+    private WordCount previous;
+
+    public void reduce(WordCount current) throws IOException {
+      if (current.equals(previous)) {
+        previous.count++;
+      } else {
+        if (previous != null)
+          collect(previous);
+        previous = newWordCount(current.word.toString(), current.count);
+      }
+    }
+    
+    public void close() throws IOException {
+      if (previous != null)
+        collect(previous);
+    }
+
+  }
+
+  public void testJob() throws Exception {
+    WordCountUtil.writeLinesFile();
+
+    JobConf job = new JobConf();
+    job.setJobName("wordcount");
+ 
+    AvroJob.setInputSpecific(job, Schema.create(Schema.Type.STRING));
+    AvroJob.setOutputSpecific(job, WordCount.SCHEMA$);
+
+    job.setMapperClass(MapImpl.class);        
+    job.setCombinerClass(ReduceImpl.class);
+    job.setReducerClass(ReduceImpl.class);
+
+    String dir = System.getProperty("test.dir",".")+"/mapred";
+    FileInputFormat.setInputPaths(job, new Path(dir+"/in"));
+    FileOutputFormat.setOutputPath(job, new Path(dir+"/out"));
+    FileOutputFormat.setCompressOutput(job, true);
+
+    JobClient.runJob(job);
+
+    WordCountUtil.validateCountsFile();
+
+  }
+
+}

Added: 
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=930060&view=auto
==============================================================================
--- 
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
 (added)
+++ 
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
 Thu Apr  1 18:30:36 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.BufferedInputStream;
+import java.util.StringTokenizer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.DataFileStream;
+
+class WordCountUtil {
+
+  private static final File DIR
+    = new File(System.getProperty("test.dir", ".") + "/mapred");
+  private static final File LINES_FILE
+    = new File(new File(DIR, "in"), "lines.avro");
+  private static final File COUNTS_FILE
+    = new File(new File(DIR, "out"), "part-00000.avro");
+
+  private static final String[] LINES = new String[] {
+    "the quick brown fox jumps over the lazy dog",
+    "the cow jumps over the moon",
+    "the rain in spain falls mainly on the plains"
+  };
+
+  private static final Map<String,Integer> COUNTS =
+    new TreeMap<String,Integer>();
+  static {
+    for (String line : LINES) {
+      StringTokenizer tokens = new StringTokenizer(line);
+      while (tokens.hasMoreTokens()) {
+        String word = tokens.nextToken();
+        int count = COUNTS.containsKey(word) ? COUNTS.get(word) : 0;
+        count++;
+        COUNTS.put(word, count);
+      }
+    }
+  }
+
+  public static void writeLinesFile() throws IOException {
+    FileUtil.fullyDelete(DIR);
+    DatumWriter<Utf8> writer = new GenericDatumWriter<Utf8>();
+    DataFileWriter<Utf8> out = new DataFileWriter<Utf8>(writer);
+    LINES_FILE.getParentFile().mkdirs();
+    out.create(Schema.create(Schema.Type.STRING), LINES_FILE);
+    for (String line : LINES)
+      out.append(new Utf8(line));
+    out.close();
+  }
+
+  public static void validateCountsFile() throws IOException {
+    DatumReader<WordCount> reader = new SpecificDatumReader<WordCount>();
+    InputStream in = new BufferedInputStream(new FileInputStream(COUNTS_FILE));
+    DataFileStream<WordCount> counts = new 
DataFileStream<WordCount>(in,reader);
+    int numWords = 0;
+    for (WordCount wc : counts) {
+      assertEquals(wc.word.toString(),
+                   (int)COUNTS.get(wc.word.toString()), wc.count);
+      numWords++;
+    }
+    in.close();
+    assertEquals(COUNTS.size(), numWords);
+  }
+
+}

Added: hadoop/avro/trunk/share/test/schemas/WordCount.avsc
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/share/test/schemas/WordCount.avsc?rev=930060&view=auto
==============================================================================
--- hadoop/avro/trunk/share/test/schemas/WordCount.avsc (added)
+++ hadoop/avro/trunk/share/test/schemas/WordCount.avsc Thu Apr  1 18:30:36 2010
@@ -0,0 +1,6 @@
+{"type":"record", "name":"org.apache.avro.mapred.WordCount",
+ "fields":[
+     {"name":"word", "type":"string"},
+     {"name":"count", "type":"int", "order":"ignore"}
+ ]
+}


Reply via email to