Author: zjshen
Date: Mon Aug 18 17:59:32 2014
New Revision: 1618679

URL: http://svn.apache.org/r1618679
Log:
MAPREDUCE-6024. Shortened the time when Fetcher is stuck in retrying before 
concluding the failure by configuration. Contributed by Yunjiong Zhao.
svn merge --ignore-ancestry -c 1618677 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1618679&r1=1618678&r2=1618679&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon 
Aug 18 17:59:32 2014
@@ -81,6 +81,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
     the current context's FS is different. (Benjamin Zhitomirsky via zjshen)
 
+    MAPREDUCE-6024. Shortened the time when Fetcher is stuck in retrying before
+    concluding the failure by configuration. (Yunjiong Zhao via zjshen)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1618679&r1=1618678&r2=1618679&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 Mon Aug 18 17:59:32 2014
@@ -148,10 +148,10 @@ public class JobImpl implements org.apac
   private static final Log LOG = LogFactory.getLog(JobImpl.class);
 
   //The maximum fraction of fetch failures allowed for a map
-  private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
-
-  // Maximum no. of fetch-failure notifications after which map task is failed
-  private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+  private float maxAllowedFetchFailuresFraction;
+  
+  //Maximum no. of fetch-failure notifications after which map task is failed
+  private int maxFetchFailuresNotifications;
 
   public static final String JOB_KILLED_DIAG =
       "Job received Kill while in RUNNING state.";
@@ -704,6 +704,13 @@ public class JobImpl implements org.apac
     if(forcedDiagnostic != null) {
       this.diagnostics.add(forcedDiagnostic);
     }
+    
+    this.maxAllowedFetchFailuresFraction = conf.getFloat(
+        MRJobConfig.MAX_ALLOWED_FETCH_FAILURES_FRACTION,
+        MRJobConfig.DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION);
+    this.maxFetchFailuresNotifications = conf.getInt(
+        MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
+        MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
   }
 
   protected StateMachine<JobStateInternal, JobEventType, JobEvent> 
getStateMachine() {
@@ -1900,9 +1907,8 @@ public class JobImpl implements org.apac
         float failureRate = shufflingReduceTasks == 0 ? 1.0f : 
           (float) fetchFailures / shufflingReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures
-        boolean isMapFaulty =
-            (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
-        if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
+        if (fetchFailures >= job.getMaxFetchFailuresNotifications()
+            && failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
           LOG.info("Too many fetch-failures for output of task attempt: " + 
               mapId + " ... raising fetch failure to map");
           job.eventHandler.handle(new TaskAttemptEvent(mapId, 
@@ -2185,4 +2191,12 @@ public class JobImpl implements org.apac
     jobConf.addResource(fc.open(confPath), confPath.toString());
     return jobConf;
   }
+
+  public float getMaxAllowedFetchFailuresFraction() {
+    return maxAllowedFetchFailuresFraction;
+  }
+
+  public int getMaxFetchFailuresNotifications() {
+    return maxFetchFailuresNotifications;
+  }
 }

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1618679&r1=1618678&r2=1618679&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 Mon Aug 18 17:59:32 2014
@@ -288,11 +288,19 @@ public interface MRJobConfig {
   public static final String SHUFFLE_READ_TIMEOUT = 
"mapreduce.reduce.shuffle.read.timeout";
 
   public static final String SHUFFLE_FETCH_FAILURES = 
"mapreduce.reduce.shuffle.maxfetchfailures";
+  public static final String MAX_ALLOWED_FETCH_FAILURES_FRACTION = 
"mapreduce.reduce.shuffle.max-fetch-failures-fraction";
+  public static final float DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5f;
+  
+  public static final String MAX_FETCH_FAILURES_NOTIFICATIONS = 
"mapreduce.reduce.shuffle.max-fetch-failures-notifications";
+  public static final int DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
 
   public static final String SHUFFLE_NOTIFY_READERROR = 
"mapreduce.reduce.shuffle.notify.readerror";
   
   public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = 
"mapreduce.reduce.shuffle.retry-delay.max.ms";
   public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
+  
+  public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = 
"mapreduce.reduce.shuffle.max-host-failures";
+  public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5;
 
   public static final String REDUCE_SKIP_INCR_PROC_COUNT = 
"mapreduce.reduce.skip.proc-count.auto-incr";
 

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1618679&r1=1618678&r2=1618679&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
 Mon Aug 18 17:59:32 2014
@@ -319,6 +319,7 @@ class Fetcher<K,V> extends Thread {
 
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
+      scheduler.hostFailed(host.getHostName());
       for(TaskAttemptID left: remaining) {
         scheduler.copyFailed(left, host, false, connectExcpt);
       }
@@ -343,6 +344,7 @@ class Fetcher<K,V> extends Thread {
       
       if(failedTasks != null && failedTasks.length > 0) {
         LOG.warn("copyMapOutput failed for tasks 
"+Arrays.toString(failedTasks));
+        scheduler.hostFailed(host.getHostName());
         for(TaskAttemptID left: failedTasks) {
           scheduler.copyFailed(left, host, true, false);
         }

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java?rev=1618679&r1=1618678&r2=1618679&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
 Mon Aug 18 17:59:32 2014
@@ -18,7 +18,6 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
-
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
@@ -101,6 +100,7 @@ public class ShuffleSchedulerImpl<K,V> i
 
   private final boolean reportReadErrorImmediately;
   private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
+  private int maxHostFailures;
 
   public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
                           TaskAttemptID reduceId,
@@ -132,6 +132,9 @@ public class ShuffleSchedulerImpl<K,V> i
 
     this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
         MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
+    this.maxHostFailures = job.getInt(
+        MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
+        MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
   }
 
   @Override
@@ -213,9 +216,18 @@ public class ShuffleSchedulerImpl<K,V> i
     progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
+  
+  public synchronized void hostFailed(String hostname) {
+    if (hostFailures.containsKey(hostname)) {
+      IntWritable x = hostFailures.get(hostname);
+      x.set(x.get() + 1);
+    } else {
+      hostFailures.put(hostname, new IntWritable(1));
+    }
+  }
 
   public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
-                                      boolean readError, boolean connectExcpt) 
{
+      boolean readError, boolean connectExcpt) {
     host.penalize();
     int failures = 1;
     if (failureCounts.containsKey(mapId)) {
@@ -226,12 +238,9 @@ public class ShuffleSchedulerImpl<K,V> i
       failureCounts.put(mapId, new IntWritable(1));
     }
     String hostname = host.getHostName();
-    if (hostFailures.containsKey(hostname)) {
-      IntWritable x = hostFailures.get(hostname);
-      x.set(x.get() + 1);
-    } else {
-      hostFailures.put(hostname, new IntWritable(1));
-    }
+    //report failure if already retried maxHostFailures times
+    boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() 
? true : false;
+    
     if (failures >= abortFailureLimit) {
       try {
         throw new IOException(failures + " failures downloading " + mapId);
@@ -240,7 +249,7 @@ public class ShuffleSchedulerImpl<K,V> i
       }
     }
 
-    checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
+    checkAndInformJobTracker(failures, mapId, readError, connectExcpt, 
hostFail);
 
     checkReducerHealth();
 
@@ -270,9 +279,9 @@ public class ShuffleSchedulerImpl<K,V> i
   // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformJobTracker(
       int failures, TaskAttemptID mapId, boolean readError,
-      boolean connectExcpt) {
+      boolean connectExcpt, boolean hostFailed) {
     if (connectExcpt || (reportReadErrorImmediately && readError)
-        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+        || ((failures % maxFetchFailuresBeforeReporting) == 0) || hostFailed) {
       LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
       status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
     }
@@ -507,4 +516,7 @@ public class ShuffleSchedulerImpl<K,V> i
     referee.join();
   }
 
+  public int getMaxHostFailures() {
+    return maxHostFailures;
+  }
 }


Reply via email to