Author: kasha
Date: Thu Aug 21 17:37:34 2014
New Revision: 1619492
URL: http://svn.apache.org/r1619492
Log:
MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with fallback.
(Todd Lipcon via kasha)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1619492&r1=1619491&r2=1619492&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Aug 21
17:37:34 2014
@@ -187,6 +187,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5906. Inconsistent configuration in property
"mapreduce.reduce.shuffle.input.buffer.percent" (Akira AJISAKA via aw)
+ MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with
+ fallback. (Todd Lipcon via kasha)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1619492&r1=1619491&r2=1619492&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
(original)
+++
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
Thu Aug 21 17:37:34 2014
@@ -381,16 +381,35 @@ public class MapTask extends Task {
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
- MapOutputCollector<KEY, VALUE> collector
- = (MapOutputCollector<KEY, VALUE>)
- ReflectionUtils.newInstance(
-
job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
- MapOutputBuffer.class, MapOutputCollector.class), job);
- LOG.info("Map output collector class = " + collector.getClass().getName());
MapOutputCollector.Context context =
- new MapOutputCollector.Context(this, job, reporter);
- collector.init(context);
- return collector;
+ new MapOutputCollector.Context(this, job, reporter);
+
+ Class<?>[] collectorClasses = job.getClasses(
+ JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
+ int remainingCollectors = collectorClasses.length;
+ for (Class clazz : collectorClasses) {
+ try {
+ if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
+ throw new IOException("Invalid output collector class: " +
clazz.getName() +
+ " (does not implement MapOutputCollector)");
+ }
+ Class<? extends MapOutputCollector> subclazz =
+ clazz.asSubclass(MapOutputCollector.class);
+ LOG.debug("Trying map output collector class: " + subclazz.getName());
+ MapOutputCollector<KEY, VALUE> collector =
+ ReflectionUtils.newInstance(subclazz, job);
+ collector.init(context);
+ LOG.info("Map output collector class = " +
collector.getClass().getName());
+ return collector;
+ } catch (Exception e) {
+ String msg = "Unable to initialize MapOutputCollector " +
clazz.getName();
+ if (--remainingCollectors > 0) {
+ msg += " (" + remainingCollectors + " more collector(s) to try)";
+ }
+ LOG.warn(msg, e);
+ }
+ }
+ throw new IOException("Unable to initialize any output collector");
}
@SuppressWarnings("unchecked")
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1619492&r1=1619491&r2=1619492&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
(original)
+++
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Thu Aug 21 17:37:34 2014
@@ -408,7 +408,9 @@
<name>mapreduce.job.map.output.collector.class</name>
<value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
<description>
- It defines the MapOutputCollector implementation to use.
+ The MapOutputCollector implementation(s) to use. This may be a
comma-separated
+ list of class names, in which case the map task will try to initialize each
+ of the collectors in turn. The first to successfully initialize will be
used.
</description>
</property>
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm?rev=1619492&r1=1619491&r2=1619492&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
(original)
+++
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
Thu Aug 21 17:37:34 2014
@@ -71,11 +71,16 @@ Hadoop MapReduce Next Generation - Plugg
*--------------------------------------+---------------------+-----------------+
| <<<mapreduce.job.reduce.shuffle.consumer.plugin.class>>> |
<<<org.apache.hadoop.mapreduce.task.reduce.Shuffle>>> | The
<<<ShuffleConsumerPlugin>>> implementation to use |
*--------------------------------------+---------------------+-----------------+
-| <<<mapreduce.job.map.output.collector.class>>> |
<<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The
<<<MapOutputCollector>>> implementation to use |
+| <<<mapreduce.job.map.output.collector.class>>> |
<<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The
<<<MapOutputCollector>>> implementation(s) to use |
*--------------------------------------+---------------------+-----------------+
These properties can also be set in the <<<mapred-site.xml>>> to change the
default values for all jobs.
+ The collector class configuration may specify a comma-separated list of
collector implementations.
+ In this case, the map task will attempt to instantiate each in turn until
one of the
+ implementations successfully initializes. This can be useful if a given
collector
+ implementation is only compatible with certain types of keys or values, for
example.
+
** NodeManager Configuration properties, <<<yarn-site.xml>>> in all nodes:
*--------------------------------------+---------------------+-----------------+
@@ -91,4 +96,3 @@ Hadoop MapReduce Next Generation - Plugg
<<<yarn.nodemanager.aux-services>>> property, for example
<<<mapred.shufflex>>>.
Then the property defining the corresponding class must be
<<<yarn.nodemanager.aux-services.mapreduce_shufflex.class>>>.
-
\ No newline at end of file