Repository: hadoop Updated Branches: refs/heads/branch-2.6 59b2a74ea -> 9b383f681
MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused correctly. Contributed by Junping Du (cherry picked from commit 177e8090f5809beb3ebcb656cd0affbb3f487de8) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9b383f68 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9b383f68 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9b383f68 Branch: refs/heads/branch-2.6 Commit: 9b383f6813d2b455f1f2f3fb56373164a38acf5b Parents: 59b2a74 Author: Jason Lowe <jl...@apache.org> Authored: Thu Nov 13 15:42:25 2014 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Thu Nov 13 15:44:00 2014 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/task/reduce/Fetcher.java | 39 +++++++++++++++----- 2 files changed, 33 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b383f68/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a4e88fa..40d5167 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -231,6 +231,9 @@ Release 2.6.0 - 2014-11-15 MAPREDUCE-5958. Wrong reduce task progress if map output is compressed (Emilio Coppa and jlowe via kihwal) + MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused + correctly (Junping Du via jlowe) + Release 2.5.2 - 2014-11-10 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b383f68/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 796394f..3f40853 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -407,7 +407,7 @@ class Fetcher<K,V> extends Thread { } if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) { LOG.warn("Failed to connect to host: " + url + "after " - + fetchRetryTimeout + "milliseconds."); + + fetchRetryTimeout + " milliseconds."); throw e; } try { @@ -596,7 +596,7 @@ class Fetcher<K,V> extends Thread { } else { // timeout, prepare to be failed. LOG.warn("Timeout for copying MapOutput with retry on host " + host - + "after " + fetchRetryTimeout + "milliseconds."); + + "after " + fetchRetryTimeout + " milliseconds."); } } @@ -678,28 +678,49 @@ class Fetcher<K,V> extends Thread { } else if (connectionTimeout > 0) { unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout); } + long startTime = Time.monotonicNow(); + long lastTime = startTime; + int attempts = 0; // set the connect timeout to the unit-connect-timeout connection.setConnectTimeout(unit); while (true) { try { + attempts++; connection.connect(); break; } catch (IOException ioe) { - // update the total remaining connect-timeout - connectionTimeout -= unit; - + long currentTime = Time.monotonicNow(); + long retryTime = currentTime - startTime; + long leftTime = connectionTimeout - retryTime; + long timeSinceLastIteration = currentTime - lastTime; // throw an exception if we have waited for timeout amount of time // note that the updated value if timeout is used here - if (connectionTimeout == 0) { + if (leftTime <= 0) { + int retryTimeInSeconds = (int) retryTime/1000; + LOG.error("Connection retry failed with " + attempts + + " attempts in " + retryTimeInSeconds + " seconds"); throw ioe; } - // reset the connect timeout for the last try - if (connectionTimeout < unit) { - unit = connectionTimeout; + if (leftTime < unit) { + unit = (int)leftTime; // reset the connect time out for the final connect connection.setConnectTimeout(unit); } + + if (timeSinceLastIteration < unit) { + try { + // sleep the left time of unit + sleep(unit - timeSinceLastIteration); + } catch (InterruptedException e) { + LOG.warn("Sleep in connection retry get interrupted."); + if (stopped) { + return; + } + } + } + // update the total remaining connect-timeout + lastTime = Time.monotonicNow(); } } }