Author: kasha Date: Thu Aug 21 17:37:34 2014 New Revision: 1619492 URL: http://svn.apache.org/r1619492 Log: MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with fallback. (Todd Lipcon via kasha)
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1619492&r1=1619491&r2=1619492&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Aug 21 17:37:34 2014 @@ -187,6 +187,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5906. Inconsistent configuration in property "mapreduce.reduce.shuffle.input.buffer.percent" (Akira AJISAKA via aw) + MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with + fallback. (Todd Lipcon via kasha) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1619492&r1=1619491&r2=1619492&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Thu Aug 21 17:37:34 2014 @@ -381,16 +381,35 @@ public class MapTask extends Task { private <KEY, VALUE> MapOutputCollector<KEY, VALUE> createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { - MapOutputCollector<KEY, VALUE> collector - = (MapOutputCollector<KEY, VALUE>) - ReflectionUtils.newInstance( - job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, - MapOutputBuffer.class, MapOutputCollector.class), job); - LOG.info("Map output collector class = " + collector.getClass().getName()); MapOutputCollector.Context context = - new MapOutputCollector.Context(this, job, reporter); - collector.init(context); - return collector; + new MapOutputCollector.Context(this, job, reporter); + + Class<?>[] collectorClasses = job.getClasses( + JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class); + int remainingCollectors = collectorClasses.length; + for (Class clazz : collectorClasses) { + try { + if (!MapOutputCollector.class.isAssignableFrom(clazz)) { + throw new IOException("Invalid output collector class: " + clazz.getName() + + " (does not implement MapOutputCollector)"); + } + Class<? extends MapOutputCollector> subclazz = + clazz.asSubclass(MapOutputCollector.class); + LOG.debug("Trying map output collector class: " + subclazz.getName()); + MapOutputCollector<KEY, VALUE> collector = + ReflectionUtils.newInstance(subclazz, job); + collector.init(context); + LOG.info("Map output collector class = " + collector.getClass().getName()); + return collector; + } catch (Exception e) { + String msg = "Unable to initialize MapOutputCollector " + clazz.getName(); + if (--remainingCollectors > 0) { + msg += " (" + remainingCollectors + " more collector(s) to try)"; + } + LOG.warn(msg, e); + } + } + throw new IOException("Unable to initialize any output collector"); } @SuppressWarnings("unchecked") Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1619492&r1=1619491&r2=1619492&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Aug 21 17:37:34 2014 @@ -408,7 +408,9 @@ <name>mapreduce.job.map.output.collector.class</name> <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value> <description> - It defines the MapOutputCollector implementation to use. + The MapOutputCollector implementation(s) to use. This may be a comma-separated + list of class names, in which case the map task will try to initialize each + of the collectors in turn. The first to successfully initialize will be used. </description> </property> Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm?rev=1619492&r1=1619491&r2=1619492&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm Thu Aug 21 17:37:34 2014 @@ -71,11 +71,16 @@ Hadoop MapReduce Next Generation - Plugg *--------------------------------------+---------------------+-----------------+ | <<<mapreduce.job.reduce.shuffle.consumer.plugin.class>>> | <<<org.apache.hadoop.mapreduce.task.reduce.Shuffle>>> | The <<<ShuffleConsumerPlugin>>> implementation to use | *--------------------------------------+---------------------+-----------------+ -| <<<mapreduce.job.map.output.collector.class>>> | <<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The <<<MapOutputCollector>>> implementation to use | +| <<<mapreduce.job.map.output.collector.class>>> | <<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The <<<MapOutputCollector>>> implementation(s) to use | *--------------------------------------+---------------------+-----------------+ These properties can also be set in the <<<mapred-site.xml>>> to change the default values for all jobs. + The collector class configuration may specify a comma-separated list of collector implementations. + In this case, the map task will attempt to instantiate each in turn until one of the + implementations successfully initializes. This can be useful if a given collector + implementation is only compatible with certain types of keys or values, for example. + ** NodeManager Configuration properties, <<<yarn-site.xml>>> in all nodes: *--------------------------------------+---------------------+-----------------+ @@ -91,4 +96,3 @@ Hadoop MapReduce Next Generation - Plugg <<<yarn.nodemanager.aux-services>>> property, for example <<<mapred.shufflex>>>. Then the property defining the corresponding class must be <<<yarn.nodemanager.aux-services.mapreduce_shufflex.class>>>. - \ No newline at end of file