Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 59b2a74ea -> 9b383f681


MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused 
correctly. Contributed by Junping Du
(cherry picked from commit 177e8090f5809beb3ebcb656cd0affbb3f487de8)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9b383f68
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9b383f68
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9b383f68

Branch: refs/heads/branch-2.6
Commit: 9b383f6813d2b455f1f2f3fb56373164a38acf5b
Parents: 59b2a74
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Nov 13 15:42:25 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Nov 13 15:44:00 2014 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 ++
 .../hadoop/mapreduce/task/reduce/Fetcher.java   | 39 +++++++++++++++-----
 2 files changed, 33 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b383f68/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index a4e88fa..40d5167 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -231,6 +231,9 @@ Release 2.6.0 - 2014-11-15
     MAPREDUCE-5958. Wrong reduce task progress if map output is compressed
     (Emilio Coppa and jlowe via kihwal)
 
+    MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused
+    correctly (Junping Du via jlowe)
+
 Release 2.5.2 - 2014-11-10
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b383f68/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 796394f..3f40853 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -407,7 +407,7 @@ class Fetcher<K,V> extends Thread {
         }
         if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) {
           LOG.warn("Failed to connect to host: " + url + "after " 
-              + fetchRetryTimeout + "milliseconds.");
+              + fetchRetryTimeout + " milliseconds.");
           throw e;
         }
         try {
@@ -596,7 +596,7 @@ class Fetcher<K,V> extends Thread {
     } else {
       // timeout, prepare to be failed.
       LOG.warn("Timeout for copying MapOutput with retry on host " + host 
-          + "after " + fetchRetryTimeout + "milliseconds.");
+          + "after " + fetchRetryTimeout + " milliseconds.");
       
     }
   }
@@ -678,28 +678,49 @@ class Fetcher<K,V> extends Thread {
     } else if (connectionTimeout > 0) {
       unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
     }
+    long startTime = Time.monotonicNow();
+    long lastTime = startTime;
+    int attempts = 0;
     // set the connect timeout to the unit-connect-timeout
     connection.setConnectTimeout(unit);
     while (true) {
       try {
+        attempts++;
         connection.connect();
         break;
       } catch (IOException ioe) {
-        // update the total remaining connect-timeout
-        connectionTimeout -= unit;
-
+        long currentTime = Time.monotonicNow();
+        long retryTime = currentTime - startTime;
+        long leftTime = connectionTimeout - retryTime;
+        long timeSinceLastIteration = currentTime - lastTime;
         // throw an exception if we have waited for timeout amount of time
         // note that the updated value if timeout is used here
-        if (connectionTimeout == 0) {
+        if (leftTime <= 0) {
+          int retryTimeInSeconds = (int) retryTime/1000;
+          LOG.error("Connection retry failed with " + attempts + 
+              " attempts in " + retryTimeInSeconds + " seconds");
           throw ioe;
         }
-
         // reset the connect timeout for the last try
-        if (connectionTimeout < unit) {
-          unit = connectionTimeout;
+        if (leftTime < unit) {
+          unit = (int)leftTime;
           // reset the connect time out for the final connect
           connection.setConnectTimeout(unit);
         }
+        
+        if (timeSinceLastIteration < unit) {
+          try {
+            // sleep the left time of unit
+            sleep(unit - timeSinceLastIteration);
+          } catch (InterruptedException e) {
+            LOG.warn("Sleep in connection retry get interrupted.");
+            if (stopped) {
+              return;
+            }
+          }
+        }
+        // update the total remaining connect-timeout
+        lastTime = Time.monotonicNow();
       }
     }
   }

Reply via email to