Repository: hadoop Updated Branches: refs/heads/branch-2 2fcd3fe6b -> 25c1e54d3
MAPREDUCE-6174. Combine common stream code into parent class for InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera) (cherry picked from commit d90c13e2da8867661bf19a802add70145ab9a462) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/25c1e54d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/25c1e54d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/25c1e54d Branch: refs/heads/branch-2 Commit: 25c1e54d3f26f144099b3b51a78a489712a3fe28 Parents: 2fcd3fe Author: Gera Shegalov <g...@apache.org> Authored: Wed Jun 3 16:26:45 2015 -0700 Committer: Gera Shegalov <g...@apache.org> Committed: Wed Jun 3 16:57:54 2015 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../task/reduce/IFileWrappedMapOutput.java | 72 ++++++++++++++++++++ .../task/reduce/InMemoryMapOutput.java | 26 ++----- .../mapreduce/task/reduce/MergeManagerImpl.java | 5 +- .../mapreduce/task/reduce/OnDiskMapOutput.java | 33 +++++---- .../mapreduce/task/reduce/TestFetcher.java | 27 ++++---- 6 files changed, 114 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/25c1e54d/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3f3ac05..68ed3e9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -73,6 +73,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-5248. Let NNBenchWithoutMR specify the replication factor for its test (Erik Paulson via jlowe) + MAPREDUCE-6174. Combine common stream code into parent class for + InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/25c1e54d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java new file mode 100644 index 0000000..119db15 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.IFileInputStream; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +/** + * Common code for allowing MapOutput classes to handle streams. + * + * @param <K> key type for map output + * @param <V> value type for map output + */ +public abstract class IFileWrappedMapOutput<K, V> extends MapOutput<K, V> { + private final Configuration conf; + private final MergeManagerImpl<K, V> merger; + + public IFileWrappedMapOutput( + Configuration c, MergeManagerImpl<K, V> m, TaskAttemptID mapId, + long size, boolean primaryMapOutput) { + super(mapId, size, primaryMapOutput); + conf = c; + merger = m; + } + + /** + * @return the merger + */ + protected MergeManagerImpl<K, V> getMerger() { + return merger; + } + + protected abstract void doShuffle( + MapHost host, IFileInputStream iFileInputStream, + long compressedLength, long decompressedLength, + ShuffleClientMetrics metrics, Reporter reporter) throws IOException; + + @Override + public void shuffle(MapHost host, InputStream input, + long compressedLength, long decompressedLength, + ShuffleClientMetrics metrics, + Reporter reporter) throws IOException { + IFileInputStream iFin = + new IFileInputStream(input, compressedLength, conf); + try { + this.doShuffle(host, iFin, compressedLength, + decompressedLength, metrics, reporter); + } finally { + iFin.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/25c1e54d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java index 24fb3bb..9b61ad5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java @@ -42,10 +42,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; @InterfaceAudience.Private @InterfaceStability.Unstable -class InMemoryMapOutput<K, V> extends MapOutput<K, V> { +class InMemoryMapOutput<K, V> extends IFileWrappedMapOutput<K, V> { private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class); - private Configuration conf; - private final MergeManagerImpl<K, V> merger; private final byte[] memory; private BoundedByteArrayOutputStream byteStream; // Decompression of map-outputs @@ -56,9 +54,7 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> { MergeManagerImpl<K, V> merger, int size, CompressionCodec codec, boolean primaryMapOutput) { - super(mapId, (long)size, primaryMapOutput); - this.conf = conf; - this.merger = merger; + super(conf, merger, mapId, (long)size, primaryMapOutput); this.codec = codec; byteStream = new BoundedByteArrayOutputStream(size); memory = byteStream.getBuffer(); @@ -78,15 +74,12 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> { } @Override - public void shuffle(MapHost host, InputStream input, + protected void doShuffle(MapHost host, IFileInputStream iFin, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { - IFileInputStream checksumIn = - new IFileInputStream(input, compressedLength, conf); + InputStream input = iFin; - input = checksumIn; - // Are map-outputs compressed? if (codec != null) { decompressor.reset(); @@ -111,13 +104,6 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> { throw new IOException("Unexpected extra bytes from input stream for " + getMapId()); } - - } catch (IOException ioe) { - // Close the streams - IOUtils.cleanup(LOG, input); - - // Re-throw - throw ioe; } finally { CodecPool.returnDecompressor(decompressor); } @@ -125,12 +111,12 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> { @Override public void commit() throws IOException { - merger.closeInMemoryFile(this); + getMerger().closeInMemoryFile(this); } @Override public void abort() { - merger.unreserve(memory.length); + getMerger().unreserve(memory.length); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/25c1e54d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index f788707..c99a330 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -263,8 +263,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> { LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + maxSingleShuffleLimit + ")"); - return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize, - jobConf, mapOutputFile, fetcher, true); + return new OnDiskMapOutput<K,V>(mapId, this, requestedSize, jobConf, + fetcher, true, FileSystem.getLocal(jobConf).getRaw(), + mapOutputFile.getInputFileForWrite(mapId.getTaskID(), requestedSize)); } // Stall shuffle if we are above the memory limit http://git-wip-us.apache.org/repos/asf/hadoop/blob/25c1e54d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java index 8275fd0..f22169d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java @@ -18,13 +18,11 @@ package org.apache.hadoop.mapreduce.task.reduce; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,41 +44,46 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private @InterfaceStability.Unstable -class OnDiskMapOutput<K, V> extends MapOutput<K, V> { +class OnDiskMapOutput<K, V> extends IFileWrappedMapOutput<K, V> { private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class); private final FileSystem fs; private final Path tmpOutputPath; private final Path outputPath; - private final MergeManagerImpl<K, V> merger; private final OutputStream disk; private long compressedSize; - private final Configuration conf; + @Deprecated public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput) throws IOException { - this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher, + this(mapId, merger, size, conf, fetcher, primaryMapOutput, FileSystem.getLocal(conf).getRaw(), mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size)); } - @VisibleForTesting + @Deprecated OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput, FileSystem fs, Path outputPath) throws IOException { - super(mapId, size, primaryMapOutput); + this(mapId, merger, size, conf, fetcher, primaryMapOutput, fs, outputPath); + } + + OnDiskMapOutput(TaskAttemptID mapId, + MergeManagerImpl<K, V> merger, long size, + JobConf conf, + int fetcher, boolean primaryMapOutput, + FileSystem fs, Path outputPath) throws IOException { + super(conf, merger, mapId, size, primaryMapOutput); this.fs = fs; - this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); - this.conf = conf; } @VisibleForTesting @@ -89,18 +92,18 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> { } @Override - public void shuffle(MapHost host, InputStream input, + protected void doShuffle(MapHost host, IFileInputStream input, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { - input = new IFileInputStream(input, compressedLength, conf); // Copy data to local-disk long bytesLeft = compressedLength; try { final int BYTES_TO_READ = 64 * 1024; byte[] buf = new byte[BYTES_TO_READ]; while (bytesLeft > 0) { - int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); + int n = input.readWithChecksum(buf, 0, + (int) Math.min(bytesLeft, BYTES_TO_READ)); if (n < 0) { throw new IOException("read past end of stream reading " + getMapId()); @@ -117,7 +120,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> { disk.close(); } catch (IOException ioe) { // Close the streams - IOUtils.cleanup(LOG, input, disk); + IOUtils.cleanup(LOG, disk); // Re-throw throw ioe; @@ -139,7 +142,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> { fs.rename(tmpOutputPath, outputPath); CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath, getSize(), this.compressedSize); - merger.closeOnDiskFile(compressAwarePath); + getMerger().closeOnDiskFile(compressAwarePath); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/25c1e54d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index a9cd33e..7888007 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -19,9 +19,7 @@ package org.apache.hadoop.mapreduce.task.reduce; import java.io.FilterInputStream; - import java.lang.Void; - import java.net.HttpURLConnection; import org.apache.hadoop.fs.ChecksumException; @@ -30,13 +28,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskID; - import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; -import static org.junit.Assert.*; +import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; @@ -65,10 +62,11 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Time; import org.junit.Test; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.nimbusds.jose.util.StringUtils; + /** * Test that the Fetcher does what we expect it to. */ @@ -453,9 +451,9 @@ public class TestFetcher { ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); when(connection.getInputStream()).thenReturn(in); // 8 < 10 therefore there appear to be extra bytes in the IFileInputStream - InMemoryMapOutput<Text,Text> mapOut = new InMemoryMapOutput<Text, Text>( + IFileWrappedMapOutput<Text,Text> mapOut = new InMemoryMapOutput<Text, Text>( job, map1ID, mm, 8, null, true ); - InMemoryMapOutput<Text,Text> mapOut2 = new InMemoryMapOutput<Text, Text>( + IFileWrappedMapOutput<Text,Text> mapOut2 = new InMemoryMapOutput<Text, Text>( job, map2ID, mm, 10, null, true ); when(mm.reserve(eq(map1ID), anyLong(), anyInt())).thenReturn(mapOut); @@ -478,9 +476,9 @@ public class TestFetcher { Path shuffledToDisk = OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher); fs = FileSystem.getLocal(job).getRaw(); - MapOutputFile mof = mock(MapOutputFile.class); - OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID, - id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath); + IFileWrappedMapOutput<Text,Text> odmo = + new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job, fetcher, true, + fs, onDiskMapOutputPath); String mapData = "MAPDATA12345678901234567890"; @@ -538,7 +536,7 @@ public class TestFetcher { @Test(timeout=10000) public void testInterruptInMemory() throws Exception { final int FETCHER = 2; - InMemoryMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>( + IFileWrappedMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>( job, id, mm, 100, null, true)); when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) .thenReturn(immo); @@ -584,10 +582,9 @@ public class TestFetcher { Path p = new Path("file:///tmp/foo"); Path pTmp = OnDiskMapOutput.getTempPath(p, FETCHER); FileSystem mFs = mock(FileSystem.class, RETURNS_DEEP_STUBS); - MapOutputFile mof = mock(MapOutputFile.class); - when(mof.getInputFileForWrite(any(TaskID.class), anyLong())).thenReturn(p); - OnDiskMapOutput<Text,Text> odmo = spy(new OnDiskMapOutput<Text,Text>(map1ID, - id, mm, 100L, job, mof, FETCHER, true, mFs, p)); + IFileWrappedMapOutput<Text,Text> odmo = + spy(new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job, + FETCHER, true, mFs, p)); when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) .thenReturn(odmo); doNothing().when(mm).waitForResource();