MAPREDUCE-6633. AM should retry map attempts if the reduce task encounters commpression related errors. Contributed by Rushabh Shah
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1fec06e0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1fec06e0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1fec06e0 Branch: refs/heads/HDFS-7240 Commit: 1fec06e037d2b22dafc64f33d4f1231bef4ceba8 Parents: b9e3eff Author: Eric Payne <epa...@apache.org> Authored: Sat Apr 9 16:51:57 2016 +0000 Committer: Eric Payne <epa...@apache.org> Committed: Sat Apr 9 16:51:57 2016 +0000 ---------------------------------------------------------------------- .../hadoop/mapreduce/task/reduce/Fetcher.java | 2 +- .../mapreduce/task/reduce/TestFetcher.java | 37 ++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1fec06e0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index fb0ac0a..d8dd7b5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -537,7 +537,7 @@ class Fetcher<K,V> extends Thread { + " len: " + compressedLength + " to " + mapOutput.getDescription()); mapOutput.shuffle(host, is, compressedLength, decompressedLength, metrics, reporter); - } catch (java.lang.InternalError e) { + } catch (java.lang.InternalError | Exception e) { LOG.warn("Failed to shuffle for fetcher#"+id, e); throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1fec06e0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 7888007..998b3de 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -346,6 +346,43 @@ public class TestFetcher { @SuppressWarnings("unchecked") @Test(timeout=10000) + public void testCopyFromHostOnAnyException() throws Exception { + InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class); + + Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm, + r, metrics, except, key, connection); + + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField( + SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenReturn(immo); + + doThrow(new ArrayIndexOutOfBoundsException()).when(immo) + .shuffle(any(MapHost.class), any(InputStream.class), anyLong(), + anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); + + underTest.copyFromHost(host); + + verify(connection) + .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, + encHash); + verify(ss, times(1)).copyFailed(map1ID, host, true, false); + } + + @SuppressWarnings("unchecked") + @Test(timeout=10000) public void testCopyFromHostWithRetry() throws Exception { InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class); ss = mock(ShuffleSchedulerImpl.class);