Author: jlowe Date: Tue Mar 12 22:51:28 2013 New Revision: 1455740 URL: http://svn.apache.org/r1455740 Log: MAPREDUCE-5060. Fetch failures that time out only count against the first map task. Contributed by Robert Joseph Evans
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1455740&r1=1455739&r2=1455740&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Mar 12 22:51:28 2013 @@ -807,6 +807,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-5023. History Server Web Services missing Job Counters (Ravi Prakash via tgraves) + MAPREDUCE-5060. Fetch failures that time out only count against the first + map task (Robert Joseph Evans via jlowe) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1455740&r1=1455739&r2=1455740&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Mar 12 22:51:28 2013 @@ -221,7 +221,6 @@ class Fetcher<K,V> extends Thread { // Construct the url and connect DataInputStream input; - boolean connectSucceeded = false; try { URL url = getMapOutputURL(host, maps); @@ -237,7 +236,6 @@ class Fetcher<K,V> extends Thread { // set the read timeout connection.setReadTimeout(readTimeout); connect(connection, connectionTimeout); - connectSucceeded = true; input = new DataInputStream(connection.getInputStream()); // Validate response code @@ -265,18 +263,10 @@ class Fetcher<K,V> extends Thread { // If connect did not succeed, just mark all the maps as failed, // indirectly penalizing the host - if (!connectSucceeded) { - for(TaskAttemptID left: remaining) { - scheduler.copyFailed(left, host, connectSucceeded, connectExcpt); - } - } else { - // If we got a read error at this stage, it implies there was a problem - // with the first map, typically lost map. So, penalize only that map - // and add the rest - TaskAttemptID firstMap = maps.get(0); - scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt); + for(TaskAttemptID left: remaining) { + scheduler.copyFailed(left, host, false, connectExcpt); } - + // Add back all the remaining maps, WITHOUT marking them as failed for(TaskAttemptID left: remaining) { scheduler.putBackKnownMapOutput(host, left); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1455740&r1=1455739&r2=1455740&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Tue Mar 12 22:51:28 2013 @@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; import java.net.URL; import java.util.ArrayList; @@ -71,6 +72,54 @@ public class TestFetcher { } @SuppressWarnings("unchecked") + @Test(timeout=30000) + public void testCopyFromHostConnectionTimeout() throws Exception { + LOG.info("testCopyFromHostConnectionTimeout"); + JobConf job = new JobConf(); + TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); + ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class); + MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class); + Reporter r = mock(Reporter.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + ExceptionReporter except = mock(ExceptionReporter.class); + SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0}); + HttpURLConnection connection = mock(HttpURLConnection.class); + when(connection.getInputStream()).thenThrow( + new SocketTimeoutException("This is a fake timeout :)")); + + Counters.Counter allErrs = mock(Counters.Counter.class); + when(r.getCounter(anyString(), anyString())) + .thenReturn(allErrs); + + Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm, + r, metrics, except, key, connection); + + MapHost host = new MapHost("localhost", "http://localhost:8080/"); + + ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1); + TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + maps.add(map1ID); + TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); + maps.add(map2ID); + when(ss.getMapsForHost(host)).thenReturn(maps); + + String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg="; + + underTest.copyFromHost(host); + + verify(connection) + .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, + encHash); + + verify(allErrs).increment(1); + verify(ss).copyFailed(map1ID, host, false, false); + verify(ss).copyFailed(map2ID, host, false, false); + + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); + } + + @SuppressWarnings("unchecked") @Test public void testCopyFromHostBogusHeader() throws Exception { LOG.info("testCopyFromHostBogusHeader");