Repository: hadoop Updated Branches: refs/heads/trunk 5639bf02d -> bc1bd7e5c
MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc1bd7e5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc1bd7e5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc1bd7e5 Branch: refs/heads/trunk Commit: bc1bd7e5c4047b374420683d36a8c30eda6d75b6 Parents: 5639bf0 Author: Jason Lowe <jl...@apache.org> Authored: Tue Apr 28 20:17:52 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Tue Apr 28 20:19:05 2015 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/task/reduce/Fetcher.java | 6 ++-- .../mapreduce/task/reduce/TestFetcher.java | 34 ++++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1bd7e5/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d27a022..2090007 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -376,6 +376,9 @@ Release 2.7.1 - UNRELEASED MAPREDUCE-6252. JobHistoryServer should not fail when encountering a missing directory. (Craig Welch via devaraj) + MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon + IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1bd7e5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index d867e4b..4b80dc9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -553,7 +553,10 @@ class Fetcher<K,V> extends Thread { metrics.successFetch(); return null; } catch (IOException ioe) { - + if (mapOutput != null) { + mapOutput.abort(); + } + if (canRetry) { checkTimeoutOrRetry(host, ioe); } @@ -574,7 +577,6 @@ class Fetcher<K,V> extends Thread { " from " + host.getHostName(), ioe); // Inform the shuffle-scheduler - mapOutput.abort(); metrics.failedFetch(); return new TaskAttemptID[] {mapId}; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1bd7e5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 723df17..a9cd33e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -628,6 +628,40 @@ public class TestFetcher { verify(odmo).abort(); } + @SuppressWarnings("unchecked") + @Test(timeout=10000) + public void testCopyFromHostWithRetryUnreserve() throws Exception { + InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class); + Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry, + id, ss, mm, r, metrics, except, key, connection); + + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + + // Verify that unreserve occurs if an exception happens after shuffle + // buffer is reserved. + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenReturn(immo); + doThrow(new IOException("forced error")).when(immo).shuffle( + any(MapHost.class), any(InputStream.class), anyLong(), + anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); + + underTest.copyFromHost(host); + verify(immo).abort(); + } + public static class FakeFetcher<K,V> extends Fetcher<K,V> { // If connection need to be reopen.