Author: suresh Date: Sun Dec 16 23:27:26 2012 New Revision: 1422716 URL: http://svn.apache.org/viewvc?rev=1422716&view=rev Log: Merge trunk into branch-trunk-win
Added: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java - copied unchanged from r1422713, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java - copied unchanged from r1422713, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java - copied unchanged from r1422713, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed) Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1422281-1422713 Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt Sun Dec 16 23:27:26 2012 @@ -14,6 +14,8 @@ Trunk (Unreleased) MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins. (Avner BenHanoch via acmurthy) + MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu) + IMPROVEMENTS MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for @@ -71,6 +73,9 @@ Trunk (Unreleased) MAPREDUCE-4735. Make arguments in TestDFSIO case insensitive. (Brandon Li via suresh) + MAPREDUCE-4809. Change visibility of classes for pluggable sort changes. + (masokan via tucu) + BUG FIXES MAPREDUCE-4356. [Rumen] Provide access to the method Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1422281-1422713 Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1422281-1422713 Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Sun Dec 16 23:27:26 2012 @@ -34,6 +34,8 @@ import java.util.concurrent.locks.Reentr import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -54,6 +56,7 @@ import org.apache.hadoop.io.serializer.S import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskCounter; @@ -71,7 +74,9 @@ import org.apache.hadoop.util.StringInte import org.apache.hadoop.util.StringUtils; /** A Map task. */ -class MapTask extends Task { +@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceStability.Unstable +public class MapTask extends Task { /** * The size of each record in the index file for the map-outputs. */ @@ -338,6 +343,10 @@ class MapTask extends Task { done(umbilical, reporter); } + public Progress getSortPhase() { + return sortPhase; + } + @SuppressWarnings("unchecked") private <T> T getSplitDetails(Path file, long offset) throws IOException { @@ -367,6 +376,22 @@ class MapTask extends Task { } @SuppressWarnings("unchecked") + private <KEY, VALUE> MapOutputCollector<KEY, VALUE> + createSortingCollector(JobConf job, TaskReporter reporter) + throws IOException, ClassNotFoundException { + MapOutputCollector<KEY, VALUE> collector + = (MapOutputCollector<KEY, VALUE>) + ReflectionUtils.newInstance( + job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, + MapOutputBuffer.class, MapOutputCollector.class), job); + LOG.info("Map output collector class = " + collector.getClass().getName()); + MapOutputCollector.Context context = + new MapOutputCollector.Context(this, job, reporter); + collector.init(context); + return collector; + } + + @SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, @@ -388,11 +413,14 @@ class MapTask extends Task { int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); - MapOutputCollector collector = null; + MapOutputCollector<OUTKEY, OUTVALUE> collector = null; if (numReduceTasks > 0) { - collector = new MapOutputBuffer(umbilical, job, reporter); + collector = createSortingCollector(job, reporter); } else { - collector = new DirectMapOutputCollector(umbilical, job, reporter); + collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>(); + MapOutputCollector.Context context = + new MapOutputCollector.Context(this, job, reporter); + collector.init(context); } MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); @@ -638,7 +666,7 @@ class MapTask extends Task { TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { - collector = new MapOutputBuffer<K,V>(umbilical, job, reporter); + collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) @@ -734,17 +762,6 @@ class MapTask extends Task { output.close(mapperContext); } - interface MapOutputCollector<K, V> { - - public void collect(K key, V value, int partition - ) throws IOException, InterruptedException; - public void close() throws IOException, InterruptedException; - - public void flush() throws IOException, InterruptedException, - ClassNotFoundException; - - } - class DirectMapOutputCollector<K, V> implements MapOutputCollector<K, V> { @@ -752,14 +769,18 @@ class MapTask extends Task { private TaskReporter reporter = null; - private final Counters.Counter mapOutputRecordCounter; - private final Counters.Counter fileOutputByteCounter; - private final List<Statistics> fsStats; + private Counters.Counter mapOutputRecordCounter; + private Counters.Counter fileOutputByteCounter; + private List<Statistics> fsStats; + + public DirectMapOutputCollector() { + } @SuppressWarnings("unchecked") - public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical, - JobConf job, TaskReporter reporter) throws IOException { - this.reporter = reporter; + public void init(MapOutputCollector.Context context + ) throws IOException, ClassNotFoundException { + this.reporter = context.getReporter(); + JobConf job = context.getJobConf(); String finalName = getOutputName(getPartition()); FileSystem fs = FileSystem.get(job); @@ -814,25 +835,27 @@ class MapTask extends Task { } } - private class MapOutputBuffer<K extends Object, V extends Object> + @InterfaceAudience.LimitedPrivate({"MapReduce"}) + @InterfaceStability.Unstable + public static class MapOutputBuffer<K extends Object, V extends Object> implements MapOutputCollector<K, V>, IndexedSortable { - final int partitions; - final JobConf job; - final TaskReporter reporter; - final Class<K> keyClass; - final Class<V> valClass; - final RawComparator<K> comparator; - final SerializationFactory serializationFactory; - final Serializer<K> keySerializer; - final Serializer<V> valSerializer; - final CombinerRunner<K,V> combinerRunner; - final CombineOutputCollector<K, V> combineCollector; + private int partitions; + private JobConf job; + private TaskReporter reporter; + private Class<K> keyClass; + private Class<V> valClass; + private RawComparator<K> comparator; + private SerializationFactory serializationFactory; + private Serializer<K> keySerializer; + private Serializer<V> valSerializer; + private CombinerRunner<K,V> combinerRunner; + private CombineOutputCollector<K, V> combineCollector; // Compression for map-outputs - final CompressionCodec codec; + private CompressionCodec codec; // k/v accounting - final IntBuffer kvmeta; // metadata overlay on backing store + private IntBuffer kvmeta; // metadata overlay on backing store int kvstart; // marks origin of spill metadata int kvend; // marks end of spill metadata int kvindex; // marks end of fully serialized records @@ -856,15 +879,15 @@ class MapTask extends Task { private static final int METASIZE = NMETA * 4; // size in bytes // spill accounting - final int maxRec; - final int softLimit; + private int maxRec; + private int softLimit; boolean spillInProgress;; int bufferRemaining; volatile Throwable sortSpillException = null; int numSpills = 0; - final int minSpillsForCombine; - final IndexedSorter sorter; + private int minSpillsForCombine; + private IndexedSorter sorter; final ReentrantLock spillLock = new ReentrantLock(); final Condition spillDone = spillLock.newCondition(); final Condition spillReady = spillLock.newCondition(); @@ -872,12 +895,12 @@ class MapTask extends Task { volatile boolean spillThreadRunning = false; final SpillThread spillThread = new SpillThread(); - final FileSystem rfs; + private FileSystem rfs; // Counters - final Counters.Counter mapOutputByteCounter; - final Counters.Counter mapOutputRecordCounter; - final Counters.Counter fileOutputByteCounter; + private Counters.Counter mapOutputByteCounter; + private Counters.Counter mapOutputRecordCounter; + private Counters.Counter fileOutputByteCounter; final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>(); @@ -885,12 +908,23 @@ class MapTask extends Task { private int indexCacheMemoryLimit; private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024; + private MapTask mapTask; + private MapOutputFile mapOutputFile; + private Progress sortPhase; + private Counters.Counter spilledRecordsCounter; + + public MapOutputBuffer() { + } + @SuppressWarnings("unchecked") - public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, - TaskReporter reporter - ) throws IOException, ClassNotFoundException { - this.job = job; - this.reporter = reporter; + public void init(MapOutputCollector.Context context + ) throws IOException, ClassNotFoundException { + job = context.getJobConf(); + reporter = context.getReporter(); + mapTask = context.getMapTask(); + mapOutputFile = mapTask.getMapOutputFile(); + sortPhase = mapTask.getSortPhase(); + spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS); partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); @@ -967,7 +1001,7 @@ class MapTask extends Task { if (combinerRunner != null) { final Counters.Counter combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); - combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf); + combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job); } else { combineCollector = null; } @@ -1118,6 +1152,10 @@ class MapTask extends Task { } } + private TaskAttemptID getTaskID() { + return mapTask.getTaskID(); + } + /** * Set the point from which meta and serialization data expand. The meta * indices are aligned with the buffer, so metadata never spans the ends of @@ -1490,7 +1528,7 @@ class MapTask extends Task { if (lspillException instanceof Error) { final String logMsg = "Task " + getTaskID() + " failed : " + StringUtils.stringifyException(lspillException); - reportFatalError(getTaskID(), lspillException, logMsg); + mapTask.reportFatalError(getTaskID(), lspillException, logMsg); } throw new IOException("Spill failed", lspillException); } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java Sun Dec 16 23:27:26 2012 @@ -26,6 +26,8 @@ import java.util.zip.CheckedInputStream; import java.util.zip.CheckedOutputStream; import java.util.zip.Checksum; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -34,7 +36,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.PureJavaCrc32; -class SpillRecord { +@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceStability.Unstable +public class SpillRecord { /** Backing store */ private final ByteBuffer buf; @@ -143,17 +147,3 @@ class SpillRecord { } } - -class IndexRecord { - long startOffset; - long rawLength; - long partLength; - - public IndexRecord() { } - - public IndexRecord(long startOffset, long rawLength, long partLength) { - this.startOffset = startOffset; - this.rawLength = rawLength; - this.partLength = partLength; - } -} Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Sun Dec 16 23:27:26 2012 @@ -584,9 +584,9 @@ abstract public class Task implements Wr return status; } - @InterfaceAudience.Private + @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable - protected class TaskReporter + public class TaskReporter extends org.apache.hadoop.mapreduce.StatusReporter implements Runnable, Reporter { private TaskUmbilicalProtocol umbilical; @@ -1466,9 +1466,9 @@ abstract public class Task implements Wr return reducerContext; } - @InterfaceAudience.Private + @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable - protected static abstract class CombinerRunner<K,V> { + public static abstract class CombinerRunner<K,V> { protected final Counters.Counter inputCounter; protected final JobConf job; protected final TaskReporter reporter; @@ -1486,13 +1486,13 @@ abstract public class Task implements Wr * @param iterator the key/value pairs to use as input * @param collector the output collector */ - abstract void combine(RawKeyValueIterator iterator, + public abstract void combine(RawKeyValueIterator iterator, OutputCollector<K,V> collector ) throws IOException, InterruptedException, ClassNotFoundException; @SuppressWarnings("unchecked") - static <K,V> + public static <K,V> CombinerRunner<K,V> create(JobConf job, TaskAttemptID taskId, Counters.Counter inputCounter, @@ -1542,7 +1542,7 @@ abstract public class Task implements Wr } @SuppressWarnings("unchecked") - protected void combine(RawKeyValueIterator kvIter, + public void combine(RawKeyValueIterator kvIter, OutputCollector<K,V> combineCollector ) throws IOException { Reducer<K,V,K,V> combiner = @@ -1611,7 +1611,7 @@ abstract public class Task implements Wr @SuppressWarnings("unchecked") @Override - void combine(RawKeyValueIterator iterator, + public void combine(RawKeyValueIterator iterator, OutputCollector<K,V> collector ) throws IOException, InterruptedException, ClassNotFoundException { Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Sun Dec 16 23:27:26 2012 @@ -30,6 +30,9 @@ public interface MRJobConfig { public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class"; + public static final String MAP_OUTPUT_COLLECTOR_CLASS_ATTR + = "mapreduce.job.map.output.collector.class"; + public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class"; public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class"; Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java Sun Dec 16 23:27:26 2012 @@ -17,9 +17,14 @@ */ package org.apache.hadoop.mapreduce.task.reduce; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + /** * An interface for reporting exceptions to other threads */ -interface ExceptionReporter { +@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceStability.Unstable +public interface ExceptionReporter { void reportException(Throwable t); } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java Sun Dec 16 23:27:26 2012 @@ -20,9 +20,14 @@ package org.apache.hadoop.mapreduce.task import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + import org.apache.hadoop.mapreduce.TaskAttemptID; -class MapHost { +@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceStability.Unstable +public class MapHost { public static enum State { IDLE, // No map outputs available Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Sun Dec 16 23:27:26 2012 @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.Atomi import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; @@ -33,7 +35,9 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.TaskAttemptID; -class MapOutput<K,V> { +@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceStability.Unstable +public class MapOutput<K,V> { private static final Log LOG = LogFactory.getLog(MapOutput.class); private static AtomicInteger ID = new AtomicInteger(0); @@ -62,7 +66,7 @@ class MapOutput<K,V> { private final boolean primaryMapOutput; - MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, + public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, JobConf conf, LocalDirAllocator localDirAllocator, int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile) throws IOException { @@ -87,7 +91,7 @@ class MapOutput<K,V> { this.primaryMapOutput = primaryMapOutput; } - MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size, + public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size, boolean primaryMapOutput) { this.id = ID.incrementAndGet(); this.mapId = mapId; Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Sun Dec 16 23:27:26 2012 @@ -59,7 +59,7 @@ import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; @SuppressWarnings(value={"unchecked", "deprecation"}) -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable public class MergeManager<K, V> { Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Sun Dec 16 23:27:26 2012 @@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.MRJob import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Progress; -@InterfaceAudience.LimitedPrivate("mapreduce") +@InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable @SuppressWarnings({"unchecked", "rawtypes"}) public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter { Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java Sun Dec 16 23:27:26 2012 @@ -17,6 +17,9 @@ */ package org.apache.hadoop.mapreduce.task.reduce; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -25,7 +28,9 @@ import org.apache.hadoop.metrics.Metrics import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.Updater; -class ShuffleClientMetrics implements Updater { +@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceStability.Unstable +public class ShuffleClientMetrics implements Updater { private MetricsRecord shuffleMetrics = null; private int numFailedFetches = 0; Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1422716&r1=1422715&r2=1422716&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Sun Dec 16 23:27:26 2012 @@ -938,4 +938,12 @@ <value>jhs/_h...@realm.tld</value> </property> +<property> + <name>mapreduce.job.map.output.collector.class</name> + <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value> + <description> + It defines the MapOutputCollector implementation to use. + </description> +</property> + </configuration> Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1422281-1422713