Author: szetszwo Date: Thu Oct 25 19:09:34 2012 New Revision: 1402278 URL: http://svn.apache.org/viewvc?rev=1402278&view=rev Log: Merge r1401869 through r1402273 from trunk.
Added: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java - copied unchanged from r1402273, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/c++/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/block_forensics/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/build-contrib.xml (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/build.xml (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/data_join/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/eclipse-plugin/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/vaidya/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/examples/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/java/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/webapps/job/ (props changed) Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1401869-1402273 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1402278&r1=1402277&r2=1402278&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Thu Oct 25 19:09:34 2012 @@ -610,6 +610,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown. (Vinod Kumar Vavilapalli via jlowe) + MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion + requests slowly to avoid HADOOP-8942. (Jason Lowe via vinodkv) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1401869-1402273 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java?rev=1402278&r1=1402277&r2=1402278&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java Thu Oct 25 19:09:34 2012 @@ -27,10 +27,8 @@ import org.apache.hadoop.mapred.TaskComp import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.TaskAttemptID; -@SuppressWarnings("deprecation") class EventFetcher<K,V> extends Thread { private static final long SLEEP_TIME = 1000; - private static final int MAX_EVENTS_TO_FETCH = 10000; private static final int MAX_RETRIES = 10; private static final int RETRY_PERIOD = 5000; private static final Log LOG = LogFactory.getLog(EventFetcher.class); @@ -38,7 +36,8 @@ class EventFetcher<K,V> extends Thread { private final TaskAttemptID reduce; private final TaskUmbilicalProtocol umbilical; private final ShuffleScheduler<K,V> scheduler; - private int fromEventId = 0; + private int fromEventIdx = 0; + private int maxEventsToFetch; private ExceptionReporter exceptionReporter = null; private int maxMapRuntime = 0; @@ -48,13 +47,15 @@ class EventFetcher<K,V> extends Thread { public EventFetcher(TaskAttemptID reduce, TaskUmbilicalProtocol umbilical, ShuffleScheduler<K,V> scheduler, - ExceptionReporter reporter) { + ExceptionReporter reporter, + int maxEventsToFetch) { setName("EventFetcher for fetching Map Completion Events"); setDaemon(true); this.reduce = reduce; this.umbilical = umbilical; this.scheduler = scheduler; exceptionReporter = reporter; + this.maxEventsToFetch = maxEventsToFetch; } @Override @@ -112,46 +113,47 @@ class EventFetcher<K,V> extends Thread { * from a given event ID. * @throws IOException */ - private int getMapCompletionEvents() throws IOException { + protected int getMapCompletionEvents() throws IOException { int numNewMaps = 0; - - MapTaskCompletionEventsUpdate update = - umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID) - reduce.getJobID(), - fromEventId, - MAX_EVENTS_TO_FETCH, - (org.apache.hadoop.mapred.TaskAttemptID) - reduce); - TaskCompletionEvent events[] = update.getMapTaskCompletionEvents(); - LOG.debug("Got " + events.length + " map completion events from " + - fromEventId); - - // Check if the reset is required. - // Since there is no ordering of the task completion events at the - // reducer, the only option to sync with the new jobtracker is to reset - // the events index - if (update.shouldReset()) { - fromEventId = 0; - scheduler.resetKnownMaps(); - } - - // Update the last seen event ID - fromEventId += events.length; - - // Process the TaskCompletionEvents: - // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs. - // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop - // fetching from those maps. - // 3. Remove TIPFAILED maps from neededOutputs since we don't need their - // outputs at all. - for (TaskCompletionEvent event : events) { - switch (event.getTaskStatus()) { + TaskCompletionEvent events[] = null; + + do { + MapTaskCompletionEventsUpdate update = + umbilical.getMapCompletionEvents( + (org.apache.hadoop.mapred.JobID)reduce.getJobID(), + fromEventIdx, + maxEventsToFetch, + (org.apache.hadoop.mapred.TaskAttemptID)reduce); + events = update.getMapTaskCompletionEvents(); + LOG.debug("Got " + events.length + " map completion events from " + + fromEventIdx); + + // Check if the reset is required. + // Since there is no ordering of the task completion events at the + // reducer, the only option to sync with the new jobtracker is to reset + // the events index + if (update.shouldReset()) { + fromEventIdx = 0; + scheduler.resetKnownMaps(); + } + + // Update the last seen event ID + fromEventIdx += events.length; + + // Process the TaskCompletionEvents: + // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs. + // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop + // fetching from those maps. + // 3. Remove TIPFAILED maps from neededOutputs since we don't need their + // outputs at all. + for (TaskCompletionEvent event : events) { + switch (event.getTaskStatus()) { case SUCCEEDED: URI u = getBaseURI(event.getTaskTrackerHttp()); scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(), - u.toString(), - event.getTaskAttemptId()); + u.toString(), + event.getTaskAttemptId()); numNewMaps ++; int duration = event.getTaskRunTime(); if (duration > maxMapRuntime) { @@ -164,15 +166,17 @@ class EventFetcher<K,V> extends Thread { case OBSOLETE: scheduler.obsoleteMapOutput(event.getTaskAttemptId()); LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + - " map-task: '" + event.getTaskAttemptId() + "'"); + " map-task: '" + event.getTaskAttemptId() + "'"); break; case TIPFAILED: scheduler.tipFailed(event.getTaskAttemptId().getTaskID()); LOG.info("Ignoring output of failed map TIP: '" + - event.getTaskAttemptId() + "'"); + event.getTaskAttemptId() + "'"); break; + } } - } + } while (events.length == maxEventsToFetch); + return numNewMaps; } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1402278&r1=1402277&r2=1402278&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Thu Oct 25 19:09:34 2012 @@ -40,9 +40,12 @@ import org.apache.hadoop.util.Progress; @InterfaceAudience.Private @InterfaceStability.Unstable -@SuppressWarnings({"deprecation", "unchecked", "rawtypes"}) +@SuppressWarnings({"unchecked", "rawtypes"}) public class Shuffle<K, V> implements ExceptionReporter { private static final int PROGRESS_FREQUENCY = 2000; + private static final int MAX_EVENTS_TO_FETCH = 10000; + private static final int MIN_EVENTS_TO_FETCH = 100; + private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000; private final TaskAttemptID reduceId; private final JobConf jobConf; @@ -99,9 +102,17 @@ public class Shuffle<K, V> implements Ex } public RawKeyValueIterator run() throws IOException, InterruptedException { + // Scale the maximum events we fetch per RPC call to mitigate OOM issues + // on the ApplicationMaster when a thundering herd of reducers fetch events + // TODO: This should not be necessary after HADOOP-8942 + int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, + MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks()); + int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer); + // Start the map-completion events fetcher thread final EventFetcher<K,V> eventFetcher = - new EventFetcher<K,V>(reduceId, umbilical, scheduler, this); + new EventFetcher<K,V>(reduceId, umbilical, scheduler, this, + maxEventsToFetch); eventFetcher.start(); // Start the map-output fetcher threads Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/c++/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/c++:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/contrib:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/block_forensics/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/block_forensics:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/build-contrib.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/build-contrib.xml:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/build.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/build.xml:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/data_join/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/data_join:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/eclipse-plugin/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/eclipse-plugin:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/vaidya/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/vaidya:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/examples/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/examples:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/java:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc:r1401869-1402273 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/webapps/job/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/webapps/job:r1401869-1402273