Author: jlowe Date: Mon Jun 10 21:45:35 2013 New Revision: 1491614 URL: http://svn.apache.org/r1491614 Log: svn merge -c 1491611 FIXES: MAPREDUCE-5308. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs. Contributed by Nathan Roberts
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1491614&r1=1491613&r2=1491614&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Jun 10 21:45:35 2013 @@ -389,6 +389,9 @@ Release 2.1.0-beta - UNRELEASED MAPREDUCE-5301. Updated MR code to work with YARN-635 changes of renaming YarnRemoteException to YarnException. (Siddharth Seth via vinodkv) + MAPREDUCE-5308. Shuffling to memory can get out-of-sync when fetching + multiple compressed map outputs (Nathan Roberts via jlowe) + BREAKDOWN OF HADOOP-8562 SUBTASKS MAPREDUCE-4739. Some MapReduce tests fail to find winutils. @@ -1004,6 +1007,9 @@ Release 0.23.9 - UNRELEASED BUG FIXES + MAPREDUCE-5308. Shuffling to memory can get out-of-sync when fetching + multiple compressed map outputs (Nathan Roberts via jlowe) + Release 0.23.8 - 2013-06-05 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java?rev=1491614&r1=1491613&r2=1491614&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java Mon Jun 10 21:45:35 2013 @@ -99,6 +99,19 @@ class InMemoryMapOutput<K, V> extends Ma reporter.progress(); LOG.info("Read " + memory.length + " bytes from map-output for " + getMapId()); + + /** + * We've gotten the amount of data we were expecting. Verify the + * decompressor has nothing more to offer. This action also forces the + * decompressor to read any trailing bytes that weren't critical + * for decompression, which is necessary to keep the stream + * in sync. + */ + if (input.read() >= 0 ) { + throw new IOException("Unexpected extra bytes from input stream for " + + getMapId()); + } + } catch (IOException ioe) { // Close the streams IOUtils.cleanup(LOG, input); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1491614&r1=1491613&r2=1491614&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Mon Jun 10 21:45:35 2013 @@ -37,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.IFileOutputStream; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -233,6 +234,80 @@ public class TestFetcher { verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); } + @SuppressWarnings("unchecked") + @Test + public void testCopyFromHostExtraBytes() throws Exception { + LOG.info("testCopyFromHostWaitExtraBytes"); + JobConf job = new JobConf(); + TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); + ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class); + MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class); + InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class); + + Reporter r = mock(Reporter.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + ExceptionReporter except = mock(ExceptionReporter.class); + SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0}); + HttpURLConnection connection = mock(HttpURLConnection.class); + + Counters.Counter allErrs = mock(Counters.Counter.class); + when(r.getCounter(anyString(), anyString())) + .thenReturn(allErrs); + + Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm, + r, metrics, except, key, connection); + + + MapHost host = new MapHost("localhost", "http://localhost:8080/"); + + ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1); + TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + maps.add(map1ID); + TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); + maps.add(map2ID); + when(ss.getMapsForHost(host)).thenReturn(maps); + + String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg="; + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1); + + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bout); + IFileOutputStream ios = new IFileOutputStream(dos); + header.write(dos); + ios.write("MAPDATA123".getBytes()); + ios.finish(); + + ShuffleHeader header2 = new ShuffleHeader(map2ID.toString(), 14, 10, 1); + IFileOutputStream ios2 = new IFileOutputStream(dos); + header2.write(dos); + ios2.write("MAPDATA456".getBytes()); + ios2.finish(); + + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + when(connection.getInputStream()).thenReturn(in); + // 8 < 10 therefore there appear to be extra bytes in the IFileInputStream + InMemoryMapOutput<Text, Text> mapOut = new InMemoryMapOutput<Text, Text>(job, map1ID, mm, 8, null, true ); + InMemoryMapOutput<Text, Text> mapOut2 = new InMemoryMapOutput<Text, Text>(job, map2ID, mm, 10, null, true ); + + when(mm.reserve(eq(map1ID), anyLong(), anyInt())).thenReturn(mapOut); + when(mm.reserve(eq(map2ID), anyLong(), anyInt())).thenReturn(mapOut2); + + + underTest.copyFromHost(host); + + + verify(allErrs).increment(1); + verify(ss).copyFailed(map1ID, host, true, false); + verify(ss, never()).copyFailed(map2ID, host, true, false); + + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); + } @SuppressWarnings("unchecked") @Test(timeout=10000) @@ -265,7 +340,6 @@ public class TestFetcher { TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); maps.add(map2ID); when(ss.getMapsForHost(host)).thenReturn(maps); - String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg="; String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); @@ -292,4 +366,4 @@ public class TestFetcher { encHash); verify(ss, times(1)).copyFailed(map1ID, host, true, false); } -} \ No newline at end of file +}