Author: tucu Date: Sat Dec 15 20:23:08 2012 New Revision: 1422345 URL: http://svn.apache.org/viewvc?rev=1422345&view=rev Log: MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1422345&r1=1422344&r2=1422345&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sat Dec 15 20:23:08 2012 @@ -14,6 +14,8 @@ Trunk (Unreleased) MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins. (Avner BenHanoch via acmurthy) + MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu) + IMPROVEMENTS MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java?rev=1422345&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java Sat Dec 15 20:23:08 2012 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceStability.Unstable +public class IndexRecord { + public long startOffset; + public long rawLength; + public long partLength; + + public IndexRecord() { } + + public IndexRecord(long startOffset, long rawLength, long partLength) { + this.startOffset = startOffset; + this.rawLength = rawLength; + this.partLength = partLength; + } +} Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java?rev=1422345&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java Sat Dec 15 20:23:08 2012 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import org.apache.hadoop.mapred.Task.TaskReporter; + +@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceStability.Unstable +public interface MapOutputCollector<K, V> { + public void init(Context context + ) throws IOException, ClassNotFoundException; + public void collect(K key, V value, int partition + ) throws IOException, InterruptedException; + public void close() throws IOException, InterruptedException; + + public void flush() throws IOException, InterruptedException, + ClassNotFoundException; + + @InterfaceAudience.LimitedPrivate({"MapReduce"}) + @InterfaceStability.Unstable + public static class Context { + private final MapTask mapTask; + private final JobConf jobConf; + private final TaskReporter reporter; + + public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) { + this.mapTask = mapTask; + this.jobConf = jobConf; + this.reporter = reporter; + } + + public MapTask getMapTask() { + return mapTask; + } + + public JobConf getJobConf() { + return jobConf; + } + + public TaskReporter getReporter() { + return reporter; + } + } +} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1422345&r1=1422344&r2=1422345&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Sat Dec 15 20:23:08 2012 @@ -56,6 +56,7 @@ import org.apache.hadoop.io.serializer.S import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskCounter; @@ -342,6 +343,10 @@ public class MapTask extends Task { done(umbilical, reporter); } + public Progress getSortPhase() { + return sortPhase; + } + @SuppressWarnings("unchecked") private <T> T getSplitDetails(Path file, long offset) throws IOException { @@ -371,6 +376,22 @@ public class MapTask extends Task { } @SuppressWarnings("unchecked") + private <KEY, VALUE> MapOutputCollector<KEY, VALUE> + createSortingCollector(JobConf job, TaskReporter reporter) + throws IOException, ClassNotFoundException { + MapOutputCollector<KEY, VALUE> collector + = (MapOutputCollector<KEY, VALUE>) + ReflectionUtils.newInstance( + job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, + MapOutputBuffer.class, MapOutputCollector.class), job); + LOG.info("Map output collector class = " + collector.getClass().getName()); + MapOutputCollector.Context context = + new MapOutputCollector.Context(this, job, reporter); + collector.init(context); + return collector; + } + + @SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, @@ -392,11 +413,14 @@ public class MapTask extends Task { int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); - MapOutputCollector collector = null; + MapOutputCollector<OUTKEY, OUTVALUE> collector = null; if (numReduceTasks > 0) { - collector = new MapOutputBuffer(umbilical, job, reporter); + collector = createSortingCollector(job, reporter); } else { - collector = new DirectMapOutputCollector(umbilical, job, reporter); + collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>(); + MapOutputCollector.Context context = + new MapOutputCollector.Context(this, job, reporter); + collector.init(context); } MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); @@ -642,7 +666,7 @@ public class MapTask extends Task { TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { - collector = new MapOutputBuffer<K,V>(umbilical, job, reporter); + collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) @@ -738,17 +762,6 @@ public class MapTask extends Task { output.close(mapperContext); } - interface MapOutputCollector<K, V> { - - public void collect(K key, V value, int partition - ) throws IOException, InterruptedException; - public void close() throws IOException, InterruptedException; - - public void flush() throws IOException, InterruptedException, - ClassNotFoundException; - - } - class DirectMapOutputCollector<K, V> implements MapOutputCollector<K, V> { @@ -756,14 +769,18 @@ public class MapTask extends Task { private TaskReporter reporter = null; - private final Counters.Counter mapOutputRecordCounter; - private final Counters.Counter fileOutputByteCounter; - private final List<Statistics> fsStats; + private Counters.Counter mapOutputRecordCounter; + private Counters.Counter fileOutputByteCounter; + private List<Statistics> fsStats; + + public DirectMapOutputCollector() { + } @SuppressWarnings("unchecked") - public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical, - JobConf job, TaskReporter reporter) throws IOException { - this.reporter = reporter; + public void init(MapOutputCollector.Context context + ) throws IOException, ClassNotFoundException { + this.reporter = context.getReporter(); + JobConf job = context.getJobConf(); String finalName = getOutputName(getPartition()); FileSystem fs = FileSystem.get(job); @@ -818,25 +835,27 @@ public class MapTask extends Task { } } - private class MapOutputBuffer<K extends Object, V extends Object> + @InterfaceAudience.LimitedPrivate({"MapReduce"}) + @InterfaceStability.Unstable + public static class MapOutputBuffer<K extends Object, V extends Object> implements MapOutputCollector<K, V>, IndexedSortable { - final int partitions; - final JobConf job; - final TaskReporter reporter; - final Class<K> keyClass; - final Class<V> valClass; - final RawComparator<K> comparator; - final SerializationFactory serializationFactory; - final Serializer<K> keySerializer; - final Serializer<V> valSerializer; - final CombinerRunner<K,V> combinerRunner; - final CombineOutputCollector<K, V> combineCollector; + private int partitions; + private JobConf job; + private TaskReporter reporter; + private Class<K> keyClass; + private Class<V> valClass; + private RawComparator<K> comparator; + private SerializationFactory serializationFactory; + private Serializer<K> keySerializer; + private Serializer<V> valSerializer; + private CombinerRunner<K,V> combinerRunner; + private CombineOutputCollector<K, V> combineCollector; // Compression for map-outputs - final CompressionCodec codec; + private CompressionCodec codec; // k/v accounting - final IntBuffer kvmeta; // metadata overlay on backing store + private IntBuffer kvmeta; // metadata overlay on backing store int kvstart; // marks origin of spill metadata int kvend; // marks end of spill metadata int kvindex; // marks end of fully serialized records @@ -860,15 +879,15 @@ public class MapTask extends Task { private static final int METASIZE = NMETA * 4; // size in bytes // spill accounting - final int maxRec; - final int softLimit; + private int maxRec; + private int softLimit; boolean spillInProgress;; int bufferRemaining; volatile Throwable sortSpillException = null; int numSpills = 0; - final int minSpillsForCombine; - final IndexedSorter sorter; + private int minSpillsForCombine; + private IndexedSorter sorter; final ReentrantLock spillLock = new ReentrantLock(); final Condition spillDone = spillLock.newCondition(); final Condition spillReady = spillLock.newCondition(); @@ -876,12 +895,12 @@ public class MapTask extends Task { volatile boolean spillThreadRunning = false; final SpillThread spillThread = new SpillThread(); - final FileSystem rfs; + private FileSystem rfs; // Counters - final Counters.Counter mapOutputByteCounter; - final Counters.Counter mapOutputRecordCounter; - final Counters.Counter fileOutputByteCounter; + private Counters.Counter mapOutputByteCounter; + private Counters.Counter mapOutputRecordCounter; + private Counters.Counter fileOutputByteCounter; final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>(); @@ -889,12 +908,23 @@ public class MapTask extends Task { private int indexCacheMemoryLimit; private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024; + private MapTask mapTask; + private MapOutputFile mapOutputFile; + private Progress sortPhase; + private Counters.Counter spilledRecordsCounter; + + public MapOutputBuffer() { + } + @SuppressWarnings("unchecked") - public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, - TaskReporter reporter - ) throws IOException, ClassNotFoundException { - this.job = job; - this.reporter = reporter; + public void init(MapOutputCollector.Context context + ) throws IOException, ClassNotFoundException { + job = context.getJobConf(); + reporter = context.getReporter(); + mapTask = context.getMapTask(); + mapOutputFile = mapTask.getMapOutputFile(); + sortPhase = mapTask.getSortPhase(); + spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS); partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); @@ -971,7 +1001,7 @@ public class MapTask extends Task { if (combinerRunner != null) { final Counters.Counter combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); - combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf); + combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job); } else { combineCollector = null; } @@ -1122,6 +1152,10 @@ public class MapTask extends Task { } } + private TaskAttemptID getTaskID() { + return mapTask.getTaskID(); + } + /** * Set the point from which meta and serialization data expand. The meta * indices are aligned with the buffer, so metadata never spans the ends of @@ -1494,7 +1528,7 @@ public class MapTask extends Task { if (lspillException instanceof Error) { final String logMsg = "Task " + getTaskID() + " failed : " + StringUtils.stringifyException(lspillException); - reportFatalError(getTaskID(), lspillException, logMsg); + mapTask.reportFatalError(getTaskID(), lspillException, logMsg); } throw new IOException("Spill failed", lspillException); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1422345&r1=1422344&r2=1422345&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java Sat Dec 15 20:23:08 2012 @@ -26,6 +26,8 @@ import java.util.zip.CheckedInputStream; import java.util.zip.CheckedOutputStream; import java.util.zip.Checksum; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -34,7 +36,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.PureJavaCrc32; -class SpillRecord { +@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceStability.Unstable +public class SpillRecord { /** Backing store */ private final ByteBuffer buf; @@ -143,17 +147,3 @@ class SpillRecord { } } - -class IndexRecord { - long startOffset; - long rawLength; - long partLength; - - public IndexRecord() { } - - public IndexRecord(long startOffset, long rawLength, long partLength) { - this.startOffset = startOffset; - this.rawLength = rawLength; - this.partLength = partLength; - } -} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1422345&r1=1422344&r2=1422345&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Sat Dec 15 20:23:08 2012 @@ -30,6 +30,9 @@ public interface MRJobConfig { public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class"; + public static final String MAP_OUTPUT_COLLECTOR_CLASS_ATTR + = "mapreduce.job.map.output.collector.class"; + public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class"; public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class"; Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1422345&r1=1422344&r2=1422345&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Sat Dec 15 20:23:08 2012 @@ -938,4 +938,12 @@ <value>jhs/_h...@realm.tld</value> </property> +<property> + <name>mapreduce.job.map.output.collector.class</name> + <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value> + <description> + It defines the MapOutputCollector implementation to use. + </description> +</property> + </configuration> Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java?rev=1422345&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java Sat Dec 15 20:23:08 2012 @@ -0,0 +1,405 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.hdfs.MiniDFSCluster; + +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; + +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.io.serializer.Serializer; + +import org.apache.hadoop.mapred.Task.TaskReporter; + +import junit.framework.TestCase; + +@SuppressWarnings(value={"unchecked", "deprecation"}) +/** + * This test tests the support for a merge operation in Hadoop. The input files + * are already sorted on the key. This test implements an external + * MapOutputCollector implementation that just copies the records to different + * partitions while maintaining the sort order in each partition. The Hadoop + * framework's merge on the reduce side will merge the partitions created to + * generate the final output which is sorted on the key. + */ +public class TestMerge extends TestCase { + private static final int NUM_HADOOP_DATA_NODES = 2; + // Number of input files is same as the number of mappers. + private static final int NUM_MAPPERS = 10; + // Number of reducers. + private static final int NUM_REDUCERS = 4; + // Number of lines per input file. + private static final int NUM_LINES = 1000; + // Where MR job's input will reside. + private static final Path INPUT_DIR = new Path("/testplugin/input"); + // Where output goes. + private static final Path OUTPUT = new Path("/testplugin/output"); + + public void testMerge() throws Exception { + MiniDFSCluster dfsCluster = null; + MiniMRClientCluster mrCluster = null; + FileSystem fileSystem = null; + try { + Configuration conf = new Configuration(); + // Start the mini-MR and mini-DFS clusters + dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_DATA_NODES, true, null); + fileSystem = dfsCluster.getFileSystem(); + mrCluster = MiniMRClientClusterFactory.create(this.getClass(), + NUM_HADOOP_DATA_NODES, conf); + // Generate input. + createInput(fileSystem); + // Run the test. + runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem); + } finally { + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + if (mrCluster != null) { + mrCluster.stop(); + } + } + } + + private void createInput(FileSystem fs) throws Exception { + fs.delete(INPUT_DIR, true); + for (int i = 0; i < NUM_MAPPERS; i++) { + OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt")); + Writer writer = new OutputStreamWriter(os); + for (int j = 0; j < NUM_LINES; j++) { + // Create sorted key, value pairs. + int k = j + 1; + String formattedNumber = String.format("%09d", k); + writer.write(formattedNumber + " " + formattedNumber + "\n"); + } + writer.close(); + } + } + + private void runMergeTest(JobConf job, FileSystem fileSystem) + throws Exception { + // Delete any existing output. + fileSystem.delete(OUTPUT, true); + job.setJobName("MergeTest"); + JobClient client = new JobClient(job); + RunningJob submittedJob = null; + FileInputFormat.setInputPaths(job, INPUT_DIR); + FileOutputFormat.setOutputPath(job, OUTPUT); + job.set("mapreduce.output.textoutputformat.separator", " "); + job.setInputFormat(TextInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setMapperClass(MyMapper.class); + job.setPartitionerClass(MyPartitioner.class); + job.setOutputFormat(TextOutputFormat.class); + job.setNumReduceTasks(NUM_REDUCERS); + job.set(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, + MapOutputCopier.class.getName()); + try { + submittedJob = client.submitJob(job); + try { + if (! client.monitorAndPrintJob(job, submittedJob)) { + throw new IOException("Job failed!"); + } + } catch(InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } catch(IOException ioe) { + System.err.println("Job failed with: " + ioe); + } finally { + verifyOutput(submittedJob, fileSystem); + } + } + + private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem) + throws Exception { + FSDataInputStream dis = null; + long numValidRecords = 0; + long numInvalidRecords = 0; + long numMappersLaunched = NUM_MAPPERS; + String prevKeyValue = "000000000"; + Path[] fileList = + FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT, + new Utils.OutputFileUtils.OutputFilesFilter())); + for (Path outFile : fileList) { + try { + dis = fileSystem.open(outFile); + String record; + while((record = dis.readLine()) != null) { + // Split the line into key and value. + int blankPos = record.indexOf(" "); + String keyString = record.substring(0, blankPos); + String valueString = record.substring(blankPos+1); + // Check for sorted output and correctness of record. + if (keyString.compareTo(prevKeyValue) >= 0 + && keyString.equals(valueString)) { + prevKeyValue = keyString; + numValidRecords++; + } else { + numInvalidRecords++; + } + } + } finally { + if (dis != null) { + dis.close(); + dis = null; + } + } + } + // Make sure we got all input records in the output in sorted order. + assertEquals((long)(NUM_MAPPERS*NUM_LINES), numValidRecords); + // Make sure there is no extraneous invalid record. + assertEquals(0, numInvalidRecords); + } + + /** + * A mapper implementation that assumes that key text contains valid integers + * in displayable form. + */ + public static class MyMapper extends MapReduceBase + implements Mapper<LongWritable, Text, Text, Text> { + private Text keyText; + private Text valueText; + + public MyMapper() { + keyText = new Text(); + valueText = new Text(); + } + + @Override + public void map(LongWritable key, Text value, + OutputCollector<Text, Text> output, + Reporter reporter) throws IOException { + String record = value.toString(); + int blankPos = record.indexOf(" "); + keyText.set(record.substring(0, blankPos)); + valueText.set(record.substring(blankPos+1)); + output.collect(keyText, valueText); + } + + public void close() throws IOException { + } + } + + /** + * Partitioner implementation to make sure that output is in total sorted + * order. We basically route key ranges to different reducers such that + * key values monotonically increase with the partition number. For example, + * in this test, the keys are numbers from 1 to 1000 in the form "000000001" + * to "000001000" in each input file. The keys "000000001" to "000000250" are + * routed to partition 0, "000000251" to "000000500" are routed to partition 1 + * and so on since we have 4 reducers. + */ + static class MyPartitioner implements Partitioner<Text, Text> { + public MyPartitioner() { + } + + public void configure(JobConf job) { + } + + public int getPartition(Text key, Text value, int numPartitions) { + int keyValue = 0; + try { + keyValue = Integer.parseInt(key.toString()); + } catch(NumberFormatException nfe) { + keyValue = 0; + } + int partitionNumber = (numPartitions*(Math.max(0, keyValue-1)))/NUM_LINES; + return partitionNumber; + } + } + + /** + * Implementation of map output copier(that avoids sorting) on the map side. + * It maintains keys in the input order within each partition created for + * reducers. + */ + static class MapOutputCopier<K, V> + implements MapOutputCollector<K, V> { + private static final int BUF_SIZE = 128*1024; + private MapTask mapTask; + private JobConf jobConf; + private TaskReporter reporter; + private int numberOfPartitions; + private Class<K> keyClass; + private Class<V> valueClass; + private KeyValueWriter<K, V> recordWriters[]; + private ByteArrayOutputStream outStreams[]; + + public MapOutputCopier() { + } + + @SuppressWarnings("unchecked") + public void init(MapOutputCollector.Context context) + throws IOException, ClassNotFoundException { + this.mapTask = context.getMapTask(); + this.jobConf = context.getJobConf(); + this.reporter = context.getReporter(); + numberOfPartitions = jobConf.getNumReduceTasks(); + keyClass = (Class<K>)jobConf.getMapOutputKeyClass(); + valueClass = (Class<V>)jobConf.getMapOutputValueClass(); + recordWriters = new KeyValueWriter[numberOfPartitions]; + outStreams = new ByteArrayOutputStream[numberOfPartitions]; + + // Create output streams for partitions. + for (int i = 0; i < numberOfPartitions; i++) { + outStreams[i] = new ByteArrayOutputStream(); + recordWriters[i] = new KeyValueWriter<K, V>(jobConf, outStreams[i], + keyClass, valueClass); + } + } + + public synchronized void collect(K key, V value, int partitionNumber + ) throws IOException, InterruptedException { + if (partitionNumber >= 0 && partitionNumber < numberOfPartitions) { + recordWriters[partitionNumber].write(key, value); + } else { + throw new IOException("Invalid partition number: " + partitionNumber); + } + reporter.progress(); + } + + public void close() throws IOException, InterruptedException { + long totalSize = 0; + for (int i = 0; i < numberOfPartitions; i++) { + recordWriters[i].close(); + outStreams[i].close(); + totalSize += outStreams[i].size(); + } + MapOutputFile mapOutputFile = mapTask.getMapOutputFile(); + Path finalOutput = mapOutputFile.getOutputFileForWrite(totalSize); + Path indexPath = mapOutputFile.getOutputIndexFileForWrite( + numberOfPartitions*mapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH); + // Copy partitions to final map output. + copyPartitions(finalOutput, indexPath); + } + + public void flush() throws IOException, InterruptedException, + ClassNotFoundException { + } + + private void copyPartitions(Path mapOutputPath, Path indexPath) + throws IOException { + FileSystem localFs = FileSystem.getLocal(jobConf); + FileSystem rfs = ((LocalFileSystem)localFs).getRaw(); + FSDataOutputStream rawOutput = rfs.create(mapOutputPath, true, BUF_SIZE); + SpillRecord spillRecord = new SpillRecord(numberOfPartitions); + IndexRecord indexRecord = new IndexRecord(); + for (int i = 0; i < numberOfPartitions; i++) { + indexRecord.startOffset = rawOutput.getPos(); + byte buffer[] = outStreams[i].toByteArray(); + IFileOutputStream checksumOutput = new IFileOutputStream(rawOutput); + checksumOutput.write(buffer); + // Write checksum. + checksumOutput.finish(); + // Write index record + indexRecord.rawLength = (long)buffer.length; + indexRecord.partLength = rawOutput.getPos() - indexRecord.startOffset; + spillRecord.putIndex(indexRecord, i); + reporter.progress(); + } + rawOutput.close(); + spillRecord.writeToFile(indexPath, jobConf); + } + } + + static class KeyValueWriter<K, V> { + private Class<K> keyClass; + private Class<V> valueClass; + private DataOutputBuffer dataBuffer; + private Serializer<K> keySerializer; + private Serializer<V> valueSerializer; + private DataOutputStream outputStream; + + public KeyValueWriter(Configuration conf, OutputStream output, + Class<K> kyClass, Class<V> valClass + ) throws IOException { + keyClass = kyClass; + valueClass = valClass; + dataBuffer = new DataOutputBuffer(); + SerializationFactory serializationFactory + = new SerializationFactory(conf); + keySerializer + = (Serializer<K>)serializationFactory.getSerializer(keyClass); + keySerializer.open(dataBuffer); + valueSerializer + = (Serializer<V>)serializationFactory.getSerializer(valueClass); + valueSerializer.open(dataBuffer); + outputStream = new DataOutputStream(output); + } + + public void write(K key, V value) throws IOException { + if (key.getClass() != keyClass) { + throw new IOException("wrong key class: "+ key.getClass() + +" is not "+ keyClass); + } + if (value.getClass() != valueClass) { + throw new IOException("wrong value class: "+ value.getClass() + +" is not "+ valueClass); + } + // Append the 'key' + keySerializer.serialize(key); + int keyLength = dataBuffer.getLength(); + if (keyLength < 0) { + throw new IOException("Negative key-length not allowed: " + keyLength + + " for " + key); + } + // Append the 'value' + valueSerializer.serialize(value); + int valueLength = dataBuffer.getLength() - keyLength; + if (valueLength < 0) { + throw new IOException("Negative value-length not allowed: " + + valueLength + " for " + value); + } + // Write the record out + WritableUtils.writeVInt(outputStream, keyLength); + WritableUtils.writeVInt(outputStream, valueLength); + outputStream.write(dataBuffer.getData(), 0, dataBuffer.getLength()); + // Reset + dataBuffer.reset(); + } + + public void close() throws IOException { + keySerializer.close(); + valueSerializer.close(); + WritableUtils.writeVInt(outputStream, IFile.EOF_MARKER); + WritableUtils.writeVInt(outputStream, IFile.EOF_MARKER); + outputStream.close(); + } + } +}