MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge. Contributed by Gera Shegalov (cherry picked from commit 7dc3c1203d1ab14c09d0aaf0869a5bcdfafb0a5a)
(cherry picked from commit 87c2d915f1cc799cb4020c945c04d3ecb82ee963) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ea42b84 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ea42b84 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ea42b84 Branch: refs/heads/branch-2.7 Commit: 4ea42b84b7b3adaa0be5e857cc5f996d5d8a98bf Parents: afa8189 Author: Jason Lowe <jl...@apache.org> Authored: Mon May 4 19:02:39 2015 +0000 Committer: Vinod Kumar Vavilapalli (I am also known as @tshooter.) <vino...@apache.org> Committed: Thu Sep 10 11:00:00 2015 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../mapreduce/task/reduce/MergeManagerImpl.java | 47 +++++++++++--------- .../mapreduce/task/reduce/TestMergeManager.java | 29 ++++++++++++ 3 files changed, 57 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ea42b84/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index efa0f91..63b6129 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -27,6 +27,9 @@ Release 2.7.2 - UNRELEASED MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file descriptors (Kuhu Shukla via jlowe) + MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge + (Gera Shegalov via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ea42b84/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index a4b1aa8..3699ddd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -93,8 +93,10 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> { Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>(); private final OnDiskMerger onDiskMerger; - - private final long memoryLimit; + + @VisibleForTesting + final long memoryLimit; + private long usedMemory; private long commitMemory; private final long maxSingleShuffleLimit; @@ -167,11 +169,10 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> { } // Allow unit tests to fix Runtime memory - this.memoryLimit = - (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, - Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) - * maxInMemCopyUse); - + this.memoryLimit = (long)(jobConf.getLong( + MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, + Runtime.getRuntime().maxMemory()) * maxInMemCopyUse); + this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100); final float singleShuffleMemoryLimitPercent = @@ -201,7 +202,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> { if (this.maxSingleShuffleLimit >= this.mergeThreshold) { throw new RuntimeException("Invalid configuration: " - + "maxSingleShuffleLimit should be less than mergeThreshold" + + "maxSingleShuffleLimit should be less than mergeThreshold " + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit + "mergeThreshold: " + this.mergeThreshold); } @@ -667,24 +668,26 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> { } } - private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, - List<InMemoryMapOutput<K,V>> inMemoryMapOutputs, - List<CompressAwarePath> onDiskMapOutputs - ) throws IOException { - LOG.info("finalMerge called with " + - inMemoryMapOutputs.size() + " in-memory map-outputs and " + - onDiskMapOutputs.size() + " on-disk map-outputs"); - + @VisibleForTesting + final long getMaxInMemReduceLimit() { final float maxRedPer = - job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f); + jobConf.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f); if (maxRedPer > 1.0 || maxRedPer < 0.0) { - throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + - maxRedPer); + throw new RuntimeException(maxRedPer + ": " + + MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + + " must be a float between 0 and 1.0"); } - int maxInMemReduce = (int)Math.min( - Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE); - + return (long)(memoryLimit * maxRedPer); + } + private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, + List<InMemoryMapOutput<K,V>> inMemoryMapOutputs, + List<CompressAwarePath> onDiskMapOutputs + ) throws IOException { + LOG.info("finalMerge called with " + + inMemoryMapOutputs.size() + " in-memory map-outputs and " + + onDiskMapOutputs.size() + " on-disk map-outputs"); + final long maxInMemReduce = getMaxInMemReduceLimit(); // merge config params Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass(); Class<V> valueClass = (Class<V>)job.getMapOutputValueClass(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ea42b84/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java index 8d6bab9..ef860af 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java @@ -260,4 +260,33 @@ public class TestMergeManager { } } + + @Test + public void testLargeMemoryLimits() throws Exception { + final JobConf conf = new JobConf(); + // Xmx in production + conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, + 8L * 1024 * 1024 * 1024); + + // M1 = Xmx fraction for map outputs + conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f); + + // M2 = max M1 fraction for a single maple output + conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f); + + // M3 = M1 fraction at which in memory merge is triggered + conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f); + + // M4 = M1 fraction of map outputs remaining in memory for a reduce + conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f); + + final MergeManagerImpl<Text, Text> mgr = new MergeManagerImpl<Text, Text>( + null, conf, mock(LocalFileSystem.class), null, null, null, null, null, + null, null, null, null, null, new MROutputFiles()); + assertTrue("Large shuffle area unusable: " + mgr.memoryLimit, + mgr.memoryLimit > Integer.MAX_VALUE); + final long maxInMemReduce = mgr.getMaxInMemReduceLimit(); + assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce, + maxInMemReduce > Integer.MAX_VALUE); + } }