Author: tucu
Date: Sat Dec 15 20:23:08 2012
New Revision: 1422345

URL: http://svn.apache.org/viewvc?rev=1422345&view=rev
Log:
MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)

Added:
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1422345&r1=1422344&r2=1422345&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sat Dec 15 
20:23:08 2012
@@ -14,6 +14,8 @@ Trunk (Unreleased)
     MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
     (Avner BenHanoch via acmurthy) 
 
+    MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
+
   IMPROVEMENTS
 
     MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for

Added: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java?rev=1422345&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java
 (added)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java
 Sat Dec 15 20:23:08 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class IndexRecord {
+  public long startOffset;
+  public long rawLength;
+  public long partLength;
+
+  public IndexRecord() { }
+
+  public IndexRecord(long startOffset, long rawLength, long partLength) {
+    this.startOffset = startOffset;
+    this.rawLength = rawLength;
+    this.partLength = partLength;
+  }
+}

Added: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java?rev=1422345&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java
 (added)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java
 Sat Dec 15 20:23:08 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import org.apache.hadoop.mapred.Task.TaskReporter;
+
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public interface MapOutputCollector<K, V> {
+  public void init(Context context
+                  ) throws IOException, ClassNotFoundException;
+  public void collect(K key, V value, int partition
+                     ) throws IOException, InterruptedException;
+  public void close() throws IOException, InterruptedException;
+    
+  public void flush() throws IOException, InterruptedException, 
+                             ClassNotFoundException;
+
+  @InterfaceAudience.LimitedPrivate({"MapReduce"})
+  @InterfaceStability.Unstable
+  public static class Context {
+    private final MapTask mapTask;
+    private final JobConf jobConf;
+    private final TaskReporter reporter;
+
+    public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) {
+      this.mapTask = mapTask;
+      this.jobConf = jobConf;
+      this.reporter = reporter;
+    }
+
+    public MapTask getMapTask() {
+      return mapTask;
+    }
+
+    public JobConf getJobConf() {
+      return jobConf;
+    }
+
+    public TaskReporter getReporter() {
+      return reporter;
+    }
+  }
+}

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1422345&r1=1422344&r2=1422345&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
 Sat Dec 15 20:23:08 2012
@@ -56,6 +56,7 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
@@ -342,6 +343,10 @@ public class MapTask extends Task {
     done(umbilical, reporter);
   }
 
+  public Progress getSortPhase() {
+    return sortPhase;
+  }
+
  @SuppressWarnings("unchecked")
  private <T> T getSplitDetails(Path file, long offset) 
   throws IOException {
@@ -371,6 +376,22 @@ public class MapTask extends Task {
  }
   
   @SuppressWarnings("unchecked")
+  private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
+          createSortingCollector(JobConf job, TaskReporter reporter)
+    throws IOException, ClassNotFoundException {
+    MapOutputCollector<KEY, VALUE> collector
+      = (MapOutputCollector<KEY, VALUE>)
+       ReflectionUtils.newInstance(
+                        
job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
+                        MapOutputBuffer.class, MapOutputCollector.class), job);
+    LOG.info("Map output collector class = " + collector.getClass().getName());
+    MapOutputCollector.Context context =
+                           new MapOutputCollector.Context(this, job, reporter);
+    collector.init(context);
+    return collector;
+  }
+
+  @SuppressWarnings("unchecked")
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runOldMapper(final JobConf job,
                     final TaskSplitIndex splitIndex,
@@ -392,11 +413,14 @@ public class MapTask extends Task {
 
     int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);
-    MapOutputCollector collector = null;
+    MapOutputCollector<OUTKEY, OUTVALUE> collector = null;
     if (numReduceTasks > 0) {
-      collector = new MapOutputBuffer(umbilical, job, reporter);
+      collector = createSortingCollector(job, reporter);
     } else { 
-      collector = new DirectMapOutputCollector(umbilical, job, reporter);
+      collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>();
+       MapOutputCollector.Context context =
+                           new MapOutputCollector.Context(this, job, reporter);
+      collector.init(context);
     }
     MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
       ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
@@ -642,7 +666,7 @@ public class MapTask extends Task {
                        TaskUmbilicalProtocol umbilical,
                        TaskReporter reporter
                        ) throws IOException, ClassNotFoundException {
-      collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
+      collector = createSortingCollector(job, reporter);
       partitions = jobContext.getNumReduceTasks();
       if (partitions > 1) {
         partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
@@ -738,17 +762,6 @@ public class MapTask extends Task {
     output.close(mapperContext);
   }
 
-  interface MapOutputCollector<K, V> {
-
-    public void collect(K key, V value, int partition
-                        ) throws IOException, InterruptedException;
-    public void close() throws IOException, InterruptedException;
-    
-    public void flush() throws IOException, InterruptedException, 
-                               ClassNotFoundException;
-        
-  }
-
   class DirectMapOutputCollector<K, V>
     implements MapOutputCollector<K, V> {
  
@@ -756,14 +769,18 @@ public class MapTask extends Task {
 
     private TaskReporter reporter = null;
 
-    private final Counters.Counter mapOutputRecordCounter;
-    private final Counters.Counter fileOutputByteCounter;
-    private final List<Statistics> fsStats;
+    private Counters.Counter mapOutputRecordCounter;
+    private Counters.Counter fileOutputByteCounter;
+    private List<Statistics> fsStats;
+
+    public DirectMapOutputCollector() {
+    }
 
     @SuppressWarnings("unchecked")
-    public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
-        JobConf job, TaskReporter reporter) throws IOException {
-      this.reporter = reporter;
+    public void init(MapOutputCollector.Context context
+                    ) throws IOException, ClassNotFoundException {
+      this.reporter = context.getReporter();
+      JobConf job = context.getJobConf();
       String finalName = getOutputName(getPartition());
       FileSystem fs = FileSystem.get(job);
 
@@ -818,25 +835,27 @@ public class MapTask extends Task {
     }
   }
 
-  private class MapOutputBuffer<K extends Object, V extends Object>
+  @InterfaceAudience.LimitedPrivate({"MapReduce"})
+  @InterfaceStability.Unstable
+  public static class MapOutputBuffer<K extends Object, V extends Object>
       implements MapOutputCollector<K, V>, IndexedSortable {
-    final int partitions;
-    final JobConf job;
-    final TaskReporter reporter;
-    final Class<K> keyClass;
-    final Class<V> valClass;
-    final RawComparator<K> comparator;
-    final SerializationFactory serializationFactory;
-    final Serializer<K> keySerializer;
-    final Serializer<V> valSerializer;
-    final CombinerRunner<K,V> combinerRunner;
-    final CombineOutputCollector<K, V> combineCollector;
+    private int partitions;
+    private JobConf job;
+    private TaskReporter reporter;
+    private Class<K> keyClass;
+    private Class<V> valClass;
+    private RawComparator<K> comparator;
+    private SerializationFactory serializationFactory;
+    private Serializer<K> keySerializer;
+    private Serializer<V> valSerializer;
+    private CombinerRunner<K,V> combinerRunner;
+    private CombineOutputCollector<K, V> combineCollector;
 
     // Compression for map-outputs
-    final CompressionCodec codec;
+    private CompressionCodec codec;
 
     // k/v accounting
-    final IntBuffer kvmeta; // metadata overlay on backing store
+    private IntBuffer kvmeta; // metadata overlay on backing store
     int kvstart;            // marks origin of spill metadata
     int kvend;              // marks end of spill metadata
     int kvindex;            // marks end of fully serialized records
@@ -860,15 +879,15 @@ public class MapTask extends Task {
     private static final int METASIZE = NMETA * 4; // size in bytes
 
     // spill accounting
-    final int maxRec;
-    final int softLimit;
+    private int maxRec;
+    private int softLimit;
     boolean spillInProgress;;
     int bufferRemaining;
     volatile Throwable sortSpillException = null;
 
     int numSpills = 0;
-    final int minSpillsForCombine;
-    final IndexedSorter sorter;
+    private int minSpillsForCombine;
+    private IndexedSorter sorter;
     final ReentrantLock spillLock = new ReentrantLock();
     final Condition spillDone = spillLock.newCondition();
     final Condition spillReady = spillLock.newCondition();
@@ -876,12 +895,12 @@ public class MapTask extends Task {
     volatile boolean spillThreadRunning = false;
     final SpillThread spillThread = new SpillThread();
 
-    final FileSystem rfs;
+    private FileSystem rfs;
 
     // Counters
-    final Counters.Counter mapOutputByteCounter;
-    final Counters.Counter mapOutputRecordCounter;
-    final Counters.Counter fileOutputByteCounter;
+    private Counters.Counter mapOutputByteCounter;
+    private Counters.Counter mapOutputRecordCounter;
+    private Counters.Counter fileOutputByteCounter;
 
     final ArrayList<SpillRecord> indexCacheList =
       new ArrayList<SpillRecord>();
@@ -889,12 +908,23 @@ public class MapTask extends Task {
     private int indexCacheMemoryLimit;
     private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
 
+    private MapTask mapTask;
+    private MapOutputFile mapOutputFile;
+    private Progress sortPhase;
+    private Counters.Counter spilledRecordsCounter;
+
+    public MapOutputBuffer() {
+    }
+
     @SuppressWarnings("unchecked")
-    public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
-                           TaskReporter reporter
-                           ) throws IOException, ClassNotFoundException {
-      this.job = job;
-      this.reporter = reporter;
+    public void init(MapOutputCollector.Context context
+                    ) throws IOException, ClassNotFoundException {
+      job = context.getJobConf();
+      reporter = context.getReporter();
+      mapTask = context.getMapTask();
+      mapOutputFile = mapTask.getMapOutputFile();
+      sortPhase = mapTask.getSortPhase();
+      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
       partitions = job.getNumReduceTasks();
       rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
 
@@ -971,7 +1001,7 @@ public class MapTask extends Task {
       if (combinerRunner != null) {
         final Counters.Counter combineOutputCounter =
           reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
-        combineCollector= new 
CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf);
+        combineCollector= new 
CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
       } else {
         combineCollector = null;
       }
@@ -1122,6 +1152,10 @@ public class MapTask extends Task {
       }
     }
 
+    private TaskAttemptID getTaskID() {
+      return mapTask.getTaskID();
+    }
+
     /**
      * Set the point from which meta and serialization data expand. The meta
      * indices are aligned with the buffer, so metadata never spans the ends of
@@ -1494,7 +1528,7 @@ public class MapTask extends Task {
         if (lspillException instanceof Error) {
           final String logMsg = "Task " + getTaskID() + " failed : " +
             StringUtils.stringifyException(lspillException);
-          reportFatalError(getTaskID(), lspillException, logMsg);
+          mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
         }
         throw new IOException("Spill failed", lspillException);
       }

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1422345&r1=1422344&r2=1422345&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
 Sat Dec 15 20:23:08 2012
@@ -26,6 +26,8 @@ import java.util.zip.CheckedInputStream;
 import java.util.zip.CheckedOutputStream;
 import java.util.zip.Checksum;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -34,7 +36,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.PureJavaCrc32;
 
-class SpillRecord {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class SpillRecord {
 
   /** Backing store */
   private final ByteBuffer buf;
@@ -143,17 +147,3 @@ class SpillRecord {
   }
 
 }
-
-class IndexRecord {
-  long startOffset;
-  long rawLength;
-  long partLength;
-
-  public IndexRecord() { }
-
-  public IndexRecord(long startOffset, long rawLength, long partLength) {
-    this.startOffset = startOffset;
-    this.rawLength = rawLength;
-    this.partLength = partLength;
-  }
-}

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1422345&r1=1422344&r2=1422345&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 Sat Dec 15 20:23:08 2012
@@ -30,6 +30,9 @@ public interface MRJobConfig {
 
   public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
 
+  public static final String MAP_OUTPUT_COLLECTOR_CLASS_ATTR
+                                  = "mapreduce.job.map.output.collector.class";
+
   public static final String COMBINE_CLASS_ATTR = 
"mapreduce.job.combine.class";
 
   public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1422345&r1=1422344&r2=1422345&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 Sat Dec 15 20:23:08 2012
@@ -938,4 +938,12 @@
   <value>jhs/_h...@realm.tld</value>
 </property>
 
+<property>
+  <name>mapreduce.job.map.output.collector.class</name>
+  <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
+  <description>
+    It defines the MapOutputCollector implementation to use.
+  </description>
+</property>
+
 </configuration>

Added: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java?rev=1422345&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
 (added)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
 Sat Dec 15 20:23:08 2012
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+
+import org.apache.hadoop.mapred.Task.TaskReporter;
+
+import junit.framework.TestCase;
+
+@SuppressWarnings(value={"unchecked", "deprecation"})
+/**
+ * This test tests the support for a merge operation in Hadoop.  The input 
files
+ * are already sorted on the key.  This test implements an external
+ * MapOutputCollector implementation that just copies the records to different
+ * partitions while maintaining the sort order in each partition.  The Hadoop
+ * framework's merge on the reduce side will merge the partitions created to
+ * generate the final output which is sorted on the key.
+ */
+public class TestMerge extends TestCase {
+  private static final int NUM_HADOOP_DATA_NODES = 2;
+  // Number of input files is same as the number of mappers.
+  private static final int NUM_MAPPERS = 10;
+  // Number of reducers.
+  private static final int NUM_REDUCERS = 4;
+  // Number of lines per input file.
+  private static final int NUM_LINES = 1000;
+  // Where MR job's input will reside.
+  private static final Path INPUT_DIR = new Path("/testplugin/input");
+  // Where output goes.
+  private static final Path OUTPUT = new Path("/testplugin/output");
+
+  public void testMerge() throws Exception {
+    MiniDFSCluster dfsCluster = null;
+    MiniMRClientCluster mrCluster = null;
+    FileSystem fileSystem = null;
+    try {
+      Configuration conf = new Configuration();
+      // Start the mini-MR and mini-DFS clusters
+      dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_DATA_NODES, true, null);
+      fileSystem = dfsCluster.getFileSystem();
+      mrCluster = MiniMRClientClusterFactory.create(this.getClass(),
+                                                 NUM_HADOOP_DATA_NODES, conf);
+      // Generate input.
+      createInput(fileSystem);
+      // Run the test.
+      runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem);
+    } finally {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+      }
+      if (mrCluster != null) {
+        mrCluster.stop();
+      }
+    }
+  }
+
+  private void createInput(FileSystem fs) throws Exception {
+    fs.delete(INPUT_DIR, true);
+    for (int i = 0; i < NUM_MAPPERS; i++) {
+      OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
+      Writer writer = new OutputStreamWriter(os);
+      for (int j = 0; j < NUM_LINES; j++) {
+        // Create sorted key, value pairs.
+        int k = j + 1;
+        String formattedNumber = String.format("%09d", k);
+        writer.write(formattedNumber + " " + formattedNumber + "\n");
+      }
+      writer.close();
+    }
+  }
+
+  private void runMergeTest(JobConf job, FileSystem fileSystem)
+    throws Exception {
+    // Delete any existing output.
+    fileSystem.delete(OUTPUT, true);
+    job.setJobName("MergeTest");
+    JobClient client = new JobClient(job);
+    RunningJob submittedJob = null;
+    FileInputFormat.setInputPaths(job, INPUT_DIR);
+    FileOutputFormat.setOutputPath(job, OUTPUT);
+    job.set("mapreduce.output.textoutputformat.separator", " ");
+    job.setInputFormat(TextInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setMapperClass(MyMapper.class);
+    job.setPartitionerClass(MyPartitioner.class);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setNumReduceTasks(NUM_REDUCERS);
+    job.set(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
+            MapOutputCopier.class.getName());
+    try {
+      submittedJob = client.submitJob(job);
+      try {
+        if (! client.monitorAndPrintJob(job, submittedJob)) {
+          throw new IOException("Job failed!");
+        }
+      } catch(InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    } catch(IOException ioe) {
+      System.err.println("Job failed with: " + ioe);
+    } finally {
+      verifyOutput(submittedJob, fileSystem);
+    }
+  }
+
+  private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem)
+    throws Exception {
+    FSDataInputStream dis = null;
+    long numValidRecords = 0;
+    long numInvalidRecords = 0;
+    long numMappersLaunched = NUM_MAPPERS;
+    String prevKeyValue = "000000000";
+    Path[] fileList = 
+      FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
+          new Utils.OutputFileUtils.OutputFilesFilter()));
+    for (Path outFile : fileList) {
+      try {
+        dis = fileSystem.open(outFile);
+        String record;
+        while((record = dis.readLine()) != null) {
+          // Split the line into key and value.
+          int blankPos = record.indexOf(" ");
+          String keyString = record.substring(0, blankPos);
+          String valueString = record.substring(blankPos+1);
+          // Check for sorted output and correctness of record.
+          if (keyString.compareTo(prevKeyValue) >= 0
+              && keyString.equals(valueString)) {
+            prevKeyValue = keyString;
+            numValidRecords++;
+          } else {
+            numInvalidRecords++;
+          }
+        }
+      } finally {
+        if (dis != null) {
+          dis.close();
+          dis = null;
+        }
+      }
+    }
+    // Make sure we got all input records in the output in sorted order.
+    assertEquals((long)(NUM_MAPPERS*NUM_LINES), numValidRecords);
+    // Make sure there is no extraneous invalid record.
+    assertEquals(0, numInvalidRecords);
+  }
+
+  /**
+   * A mapper implementation that assumes that key text contains valid integers
+   * in displayable form.
+   */
+  public static class MyMapper extends MapReduceBase
+    implements Mapper<LongWritable, Text, Text, Text> {
+      private Text keyText;
+      private Text valueText;
+
+      public MyMapper() {
+        keyText = new Text();
+        valueText = new Text();
+      }
+
+      @Override
+      public void map(LongWritable key, Text value,
+                      OutputCollector<Text, Text> output,
+                      Reporter reporter) throws IOException {
+        String record = value.toString();
+        int blankPos = record.indexOf(" ");
+        keyText.set(record.substring(0, blankPos));
+        valueText.set(record.substring(blankPos+1));
+        output.collect(keyText, valueText);
+      }
+    
+      public void close() throws IOException {
+      }
+    }
+
+  /**
+   * Partitioner implementation to make sure that output is in total sorted
+   * order.  We basically route key ranges to different reducers such that
+   * key values monotonically increase with the partition number.  For example,
+   * in this test, the keys are numbers from 1 to 1000 in the form "000000001"
+   * to "000001000" in each input file.  The keys "000000001" to "000000250" 
are
+   * routed to partition 0, "000000251" to "000000500" are routed to partition 
1
+   * and so on since we have 4 reducers.
+   */
+  static class MyPartitioner implements Partitioner<Text, Text> {
+    public MyPartitioner() {
+    }
+
+    public void configure(JobConf job) {
+    }
+
+    public int getPartition(Text key, Text value, int numPartitions) {
+      int keyValue = 0;
+      try {
+        keyValue = Integer.parseInt(key.toString());
+      } catch(NumberFormatException nfe) {
+        keyValue = 0;
+      }
+      int partitionNumber = (numPartitions*(Math.max(0, 
keyValue-1)))/NUM_LINES;
+      return partitionNumber;
+    }
+  }
+
+  /**
+   * Implementation of map output copier(that avoids sorting) on the map side.
+   * It maintains keys in the input order within each partition created for
+   * reducers.
+   */
+  static class MapOutputCopier<K, V>
+    implements MapOutputCollector<K, V> {
+    private static final int BUF_SIZE = 128*1024;
+    private MapTask mapTask;
+    private JobConf jobConf;
+    private TaskReporter reporter;
+    private int numberOfPartitions;
+    private Class<K> keyClass;
+    private Class<V> valueClass;
+    private KeyValueWriter<K, V> recordWriters[];
+    private ByteArrayOutputStream outStreams[];
+
+    public MapOutputCopier() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public void init(MapOutputCollector.Context context)
+      throws IOException, ClassNotFoundException {
+      this.mapTask = context.getMapTask();
+      this.jobConf = context.getJobConf();
+      this.reporter = context.getReporter();
+      numberOfPartitions = jobConf.getNumReduceTasks();
+      keyClass = (Class<K>)jobConf.getMapOutputKeyClass();
+      valueClass = (Class<V>)jobConf.getMapOutputValueClass();
+      recordWriters = new KeyValueWriter[numberOfPartitions];
+      outStreams = new ByteArrayOutputStream[numberOfPartitions];
+
+      // Create output streams for partitions.
+      for (int i = 0; i < numberOfPartitions; i++) {
+        outStreams[i] = new ByteArrayOutputStream();
+        recordWriters[i] = new KeyValueWriter<K, V>(jobConf, outStreams[i],
+                                                    keyClass, valueClass);
+      }
+    }
+
+    public synchronized void collect(K key, V value, int partitionNumber
+                                    ) throws IOException, InterruptedException 
{
+      if (partitionNumber >= 0 && partitionNumber < numberOfPartitions) {
+        recordWriters[partitionNumber].write(key, value);
+      } else {
+        throw new IOException("Invalid partition number: " + partitionNumber);
+      }
+      reporter.progress();
+    }
+
+    public void close() throws IOException, InterruptedException {
+      long totalSize = 0;
+      for (int i = 0; i < numberOfPartitions; i++) {
+        recordWriters[i].close();
+        outStreams[i].close();
+        totalSize += outStreams[i].size();
+      }
+      MapOutputFile mapOutputFile = mapTask.getMapOutputFile();
+      Path finalOutput = mapOutputFile.getOutputFileForWrite(totalSize);
+      Path indexPath = mapOutputFile.getOutputIndexFileForWrite(
+                     
numberOfPartitions*mapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+      // Copy partitions to final map output.
+      copyPartitions(finalOutput, indexPath);
+    }
+
+    public void flush() throws IOException, InterruptedException, 
+                               ClassNotFoundException {
+    }
+
+    private void copyPartitions(Path mapOutputPath, Path indexPath)
+      throws IOException {
+      FileSystem localFs = FileSystem.getLocal(jobConf);
+      FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
+      FSDataOutputStream rawOutput = rfs.create(mapOutputPath, true, BUF_SIZE);
+      SpillRecord spillRecord = new SpillRecord(numberOfPartitions);
+      IndexRecord indexRecord = new IndexRecord();
+      for (int i = 0; i < numberOfPartitions; i++) {
+        indexRecord.startOffset = rawOutput.getPos();
+        byte buffer[] = outStreams[i].toByteArray();
+        IFileOutputStream checksumOutput = new IFileOutputStream(rawOutput);
+        checksumOutput.write(buffer);
+        // Write checksum.
+        checksumOutput.finish();
+        // Write index record
+        indexRecord.rawLength = (long)buffer.length;
+        indexRecord.partLength = rawOutput.getPos() - indexRecord.startOffset;
+        spillRecord.putIndex(indexRecord, i);
+        reporter.progress();
+      }
+      rawOutput.close();
+      spillRecord.writeToFile(indexPath, jobConf);
+    }
+  }
+
+  static class KeyValueWriter<K, V> {
+    private Class<K> keyClass;
+    private Class<V> valueClass;
+    private DataOutputBuffer dataBuffer;
+    private Serializer<K> keySerializer;
+    private Serializer<V> valueSerializer;
+    private DataOutputStream outputStream;
+
+    public KeyValueWriter(Configuration conf, OutputStream output,
+                          Class<K> kyClass, Class<V> valClass
+                         ) throws IOException {
+      keyClass = kyClass;
+      valueClass = valClass;
+      dataBuffer = new DataOutputBuffer();
+      SerializationFactory serializationFactory
+                                             = new SerializationFactory(conf);
+      keySerializer
+                  = 
(Serializer<K>)serializationFactory.getSerializer(keyClass);
+      keySerializer.open(dataBuffer);
+      valueSerializer
+                = 
(Serializer<V>)serializationFactory.getSerializer(valueClass);
+      valueSerializer.open(dataBuffer);
+      outputStream = new DataOutputStream(output);
+    }
+
+    public void write(K key, V value) throws IOException {
+      if (key.getClass() != keyClass) {
+        throw new IOException("wrong key class: "+ key.getClass()
+                              +" is not "+ keyClass);
+      }
+      if (value.getClass() != valueClass) {
+        throw new IOException("wrong value class: "+ value.getClass()
+                              +" is not "+ valueClass);
+      }
+      // Append the 'key'
+      keySerializer.serialize(key);
+      int keyLength = dataBuffer.getLength();
+      if (keyLength < 0) {
+        throw new IOException("Negative key-length not allowed: " + keyLength 
+ 
+                              " for " + key);
+      }
+      // Append the 'value'
+      valueSerializer.serialize(value);
+      int valueLength = dataBuffer.getLength() - keyLength;
+      if (valueLength < 0) {
+        throw new IOException("Negative value-length not allowed: " + 
+                              valueLength + " for " + value);
+      }
+      // Write the record out
+      WritableUtils.writeVInt(outputStream, keyLength);
+      WritableUtils.writeVInt(outputStream, valueLength);
+      outputStream.write(dataBuffer.getData(), 0, dataBuffer.getLength());
+      // Reset
+      dataBuffer.reset();
+     }
+
+    public void close() throws IOException {
+      keySerializer.close();
+      valueSerializer.close();
+      WritableUtils.writeVInt(outputStream, IFile.EOF_MARKER);
+      WritableUtils.writeVInt(outputStream, IFile.EOF_MARKER);
+      outputStream.close();
+    }
+  }
+}


Reply via email to