Author: kasha
Date: Thu Aug 21 17:37:34 2014
New Revision: 1619492

URL: http://svn.apache.org/r1619492
Log:
MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with fallback. 
(Todd Lipcon via kasha)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1619492&r1=1619491&r2=1619492&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Aug 21 
17:37:34 2014
@@ -187,6 +187,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5906. Inconsistent configuration in property
       "mapreduce.reduce.shuffle.input.buffer.percent" (Akira AJISAKA via aw)
 
+    MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with 
+    fallback. (Todd Lipcon via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1619492&r1=1619491&r2=1619492&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
 Thu Aug 21 17:37:34 2014
@@ -381,16 +381,35 @@ public class MapTask extends Task {
   private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
           createSortingCollector(JobConf job, TaskReporter reporter)
     throws IOException, ClassNotFoundException {
-    MapOutputCollector<KEY, VALUE> collector
-      = (MapOutputCollector<KEY, VALUE>)
-       ReflectionUtils.newInstance(
-                        
job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
-                        MapOutputBuffer.class, MapOutputCollector.class), job);
-    LOG.info("Map output collector class = " + collector.getClass().getName());
     MapOutputCollector.Context context =
-                           new MapOutputCollector.Context(this, job, reporter);
-    collector.init(context);
-    return collector;
+      new MapOutputCollector.Context(this, job, reporter);
+
+    Class<?>[] collectorClasses = job.getClasses(
+      JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
+    int remainingCollectors = collectorClasses.length;
+    for (Class clazz : collectorClasses) {
+      try {
+        if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
+          throw new IOException("Invalid output collector class: " + 
clazz.getName() +
+            " (does not implement MapOutputCollector)");
+        }
+        Class<? extends MapOutputCollector> subclazz =
+          clazz.asSubclass(MapOutputCollector.class);
+        LOG.debug("Trying map output collector class: " + subclazz.getName());
+        MapOutputCollector<KEY, VALUE> collector =
+          ReflectionUtils.newInstance(subclazz, job);
+        collector.init(context);
+        LOG.info("Map output collector class = " + 
collector.getClass().getName());
+        return collector;
+      } catch (Exception e) {
+        String msg = "Unable to initialize MapOutputCollector " + 
clazz.getName();
+        if (--remainingCollectors > 0) {
+          msg += " (" + remainingCollectors + " more collector(s) to try)";
+        }
+        LOG.warn(msg, e);
+      }
+    }
+    throw new IOException("Unable to initialize any output collector");
   }
 
   @SuppressWarnings("unchecked")

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1619492&r1=1619491&r2=1619492&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 Thu Aug 21 17:37:34 2014
@@ -408,7 +408,9 @@
   <name>mapreduce.job.map.output.collector.class</name>
   <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
   <description>
-    It defines the MapOutputCollector implementation to use.
+    The MapOutputCollector implementation(s) to use. This may be a 
comma-separated
+    list of class names, in which case the map task will try to initialize each
+    of the collectors in turn. The first to successfully initialize will be 
used.
   </description>
 </property>
  

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm?rev=1619492&r1=1619491&r2=1619492&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
 Thu Aug 21 17:37:34 2014
@@ -71,11 +71,16 @@ Hadoop MapReduce Next Generation - Plugg
 
*--------------------------------------+---------------------+-----------------+
 | <<<mapreduce.job.reduce.shuffle.consumer.plugin.class>>> | 
<<<org.apache.hadoop.mapreduce.task.reduce.Shuffle>>>         | The 
<<<ShuffleConsumerPlugin>>> implementation to use |
 
*--------------------------------------+---------------------+-----------------+
-| <<<mapreduce.job.map.output.collector.class>>>   | 
<<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The 
<<<MapOutputCollector>>> implementation to use |
+| <<<mapreduce.job.map.output.collector.class>>>   | 
<<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The 
<<<MapOutputCollector>>> implementation(s) to use |
 
*--------------------------------------+---------------------+-----------------+
 
   These properties can also be set in the <<<mapred-site.xml>>> to change the 
default values for all jobs.
 
+  The collector class configuration may specify a comma-separated list of 
collector implementations.
+  In this case, the map task will attempt to instantiate each in turn until 
one of the
+  implementations successfully initializes. This can be useful if a given 
collector
+  implementation is only compatible with certain types of keys or values, for 
example.
+
 ** NodeManager Configuration properties, <<<yarn-site.xml>>> in all nodes:
 
 
*--------------------------------------+---------------------+-----------------+
@@ -91,4 +96,3 @@ Hadoop MapReduce Next Generation - Plugg
   <<<yarn.nodemanager.aux-services>>> property, for example 
<<<mapred.shufflex>>>.
   Then the property defining the corresponding class must be
   <<<yarn.nodemanager.aux-services.mapreduce_shufflex.class>>>.
-  
\ No newline at end of file


Reply via email to